/**
Command line tool that reads TSV files and summarizes field values associated with
equivalent keys.

Copyright (c) 2016-2021, eBay Inc.
Initially written by Jon Degenhardt

License: Boost License 1.0 (http://boost.org/LICENSE_1_0.txt)
*/
module tsv_utils.tsv_summarize;

import std.algorithm : all, any, canFind, each, find, findSplit, map, joiner, splitter;
import std.array : join;
import std.conv : to;
import std.exception : enforce;
import std.format : format;
import std.range;
import std.stdio;
import std.typecons : tuple;
import std.container : DList;

static if (__VERSION__ >= 2085) extern(C) __gshared string[] rt_options = [ "gcopt=cleanup:none" ];

version(unittest)
{
    // When running unit tests, use main from -main compiler switch.
}
else
{
    int main(string[] cmdArgs)
    {
        /* When running in DMD code coverage mode, turn on report merging. */
        version(D_Coverage) version(DigitalMars)
        {
            import core.runtime : dmd_coverSetMerge;
            dmd_coverSetMerge(true);
        }

        TsvSummarizeOptions cmdopt;
        auto r = cmdopt.processArgs(cmdArgs);
        if (!r[0]) return r[1];
        version(LDC_Profile)
        {
            import ldc.profile : resetAll;
            resetAll();
        }
        try tsvSummarize(cmdopt);
        catch (Exception exc)
        {
            stderr.writefln("Error [%s]: %s", cmdopt.programName, exc.msg);
            return 1;
        }
        return 0;
    }
}

auto helpTextVerbose = q"EOS
Synopsis: tsv-summarize [options] file [file...]

tsv-summarize reads tabular data files (tab-separated by default), tracks
field values for each unique key, and runs summarization algorithms. Consider
the file data.tsv:

  Make    Color   Time
  ford    blue    131
  chevy   green   124
  ford    red     128
  bmw     black   118
  bmw     black   126
  ford    blue    122

The min and average times for each make is generated by the command:

  $ tsv-summarize --header --group-by Make --min Time --mean Time data.tsv

This produces:

  Make   Time_min Time_mean
  ford   122      127
  chevy  124      124
  bmw    118      122

Using '--group-by Make,Color' will group by both 'Make' and 'Color'.
Omitting the '--group-by' entirely summarizes fields for the full file.

The previous example uses field names to identify fields. Field numbers
can be used as well. The next two commands are equivalent:

  $ tsv-summarize -H --group-by Make,Color --min Time --mean Time data.tsv
  $ tsv-summarize -H --group-by 1,2 --min 3 --mean 3 data.tsv

The program tries to generate useful headers, but custom headers can be
specified. Example (using -g and -H shortcuts for --header and --group-by):

  $ tsv-summarize -H -g 1 --min 3:Fastest --mean 3:Average data.tsv

Most operators take custom headers in a similarly way, generally following:

  --<operator-name> FIELD[:header]

Operators can be specified multiple times. They can also take multiple
fields (though not when a custom header is specified). Examples:

  --median 2,3,4
  --median 2-5,7-11
  --median elapsed_time,system_time,user_time
  --median '*_time'              # Wildcard. All fields ending in '_time'.

The quantile operator requires one or more probabilities after the fields:

  --quantile run_time:0.25       # Quantile 1 of the 'run_time' field
  --quantile 2:0.25              # Quantile 1 of field 2
  --quantile 2-4:0.25,0.5,0.75   # Q1, Median, Q3 of fields 2, 3, 4

Summarization operators available are:
  count       range        mad            values
  retain      sum          var            unique-values
  first       mean         stddev         unique-count
  last        median       mode           missing-count
  min         quantile     mode-count     not-missing-count
  max

Calculated numeric values are printed to 12 significant digits by default.
This can be changed using the '--p|float-precision' option. If six or less
it sets the number of significant digits after the decimal point. If
greater than six it sets the total number of significant digits.

Calculations hold onto the minimum data needed while reading data. A few
operations like median keep all data values in memory. These operations will
start to encounter performance issues as available memory becomes scarce. The
size that can be handled effectively is machine dependent, but often quite
large files can be handled.

Operations requiring numeric entries will signal an error and terminate
processing if a non-numeric entry is found.

Missing values are not treated specially by default, this can be changed
using the '--x|exclude-missing' or '--r|replace-missing' option. The former
turns off processing for missing values, the latter uses a replacement value.

Options:
EOS";

auto helpText = q"EOS
Synopsis: tsv-summarize [options] file [file...]

tsv-summarize runs aggregation operations on fields in tab-separated value
files. Operations can be run against the full input data or grouped by key
fields. Fields can be specified either by field number or field name. Use
'--help-verbose' for more detailed help.

Options:
EOS";

/** Command line options - Container and processing. The processArgs method is used to
 * process the command line.
 */
struct TsvSummarizeOptions {
    import tsv_utils.common.utils : byLineSourceRange, ByLineSourceRange;

    string programName;                /// Program name
    ByLineSourceRange!() inputSources; /// Input Files
    size_t[] keyFields;                /// -g, --group-by
    bool hasHeader = false;            /// --header
    bool writeHeader = false;          /// -w, --write-header
    char inputFieldDelimiter = '\t';   /// --d|delimiter
    char valuesDelimiter = '|';        /// --v|values-delimiter
    size_t floatPrecision = 12;        /// --p|float-precision
    DList!Operator operators;          /// Operators, in the order specified.
    size_t endFieldIndex = 0;          /// Derived value. Max field index used plus one.
    MissingFieldPolicy globalMissingPolicy = new MissingFieldPolicy;   /// Derived value.

    /* tsv-summarize operators require access to the header line when the operator is
     * created. This is because named fields may be used to describe fields names. To
     * enable this, a CmdOptionHandler delegate is added to the cmdLinOperatorOptions
     * array during during initial processing by std.getopt. The group-by operation is
     * similar, but is added to the cmdLineOtherFieldOptions instead. At least one
     * cmdLineOperatorOptions entry is required.
     *
     * The different handlers are defined after processArgs.
     */

    /* CmdOptionHandler delegate signature - This is the call made to process the command
     * line option arguments after the header line has been read.
     */
    alias CmdOptionHandler = void delegate(bool hasHeader, string[] headerFields);

    private CmdOptionHandler[]  cmdLineOperatorOptions;
    private CmdOptionHandler[]  cmdLineOtherFieldOptions;

    /* Returns a tuple. First value is true if command line arguments were successfully
     * processed and execution should continue, or false if an error occurred or the user
     * asked for help. If false, the second value is the appropriate exit code (0 or 1).
     *
     * Returning true (execution continues) means args have been validated and derived
     * values calculated. In addition, field indices have been converted to zero-based.
     */
    auto processArgs (ref string[] cmdArgs) {
        import std.algorithm : any, each;
        import std.getopt;
        import std.path : baseName, stripExtension;
        import std.typecons : Yes, No;
        import tsv_utils.common.fieldlist : fieldListHelpText;
        import tsv_utils.common.getopt_inorder;
        import tsv_utils.common.utils : throwIfWindowsNewline;

        bool helpVerbose = false;          // --help-verbose
        bool helpFields = false;           // --help-fields
        bool versionWanted = false;        // --V|version
        bool excludeMissing = false;       // --x|exclude-missing
        string missingValueReplacement;    // --r|replace-missing


        programName = (cmdArgs.length > 0) ? cmdArgs[0].stripExtension.baseName : "Unknown_program_name";

        try
        {
            arraySep = ",";    // Use comma to separate values in command line options
            auto r = getoptInorder(
                cmdArgs,
                "help-verbose",       "              Print full help.", &helpVerbose,
                "help-fields",        "              Print help on specifying fields.", &helpFields,

                std.getopt.config.caseSensitive,
                "V|version",          "              Print version information and exit.", &versionWanted,
                std.getopt.config.caseInsensitive,

                "g|group-by",         "<field-list>  Fields to use as key.", &addGroupByOptionHandler,

                std.getopt.config.caseSensitive,
                "H|header",           "              Treat the first line of each file as a header.", &hasHeader,
                std.getopt.config.caseInsensitive,

                "w|write-header",     "              Write an output header even if there is no input header.", &writeHeader,
                "d|delimiter",        "CHR           Field delimiter. Default: TAB. (Single byte UTF-8 characters only.)", &inputFieldDelimiter,
                "v|values-delimiter", "CHR           Values delimiter. Default: vertical bar (|). (Single byte UTF-8 characters only.)", &valuesDelimiter,
                "p|float-precision",  "NUM           'Precision' to use printing floating point numbers. Affects the number of digits printed and exponent use. Default: 12", &floatPrecision,
                "x|exclude-missing",  "              Exclude missing (empty) fields from calculations.", &excludeMissing,
                "r|replace-missing",  "STR           Replace missing (empty) fields with STR in calculations.", &missingValueReplacement,
                "count",              "              Count occurrences of each unique key ('--g|group-by'), or the total number of records if no key field is specified.", &addCountOptionHandler,
                "count-header",       "STR           Count occurrences of each unique key, like '--count', but use STR as the header.", &addCountHeaderOptionHandler,
                "retain",             "<field-list>  Retain one copy of the field.", &addOperatorOptionHandler!RetainOperator,
                "first",              "<field-list>[:STR]  First value seen.", &addOperatorOptionHandler!FirstOperator,
                "last",               "<field-list>[:STR]  Last value seen.", &addOperatorOptionHandler!LastOperator,
                "min",                "<field-list>[:STR]  Min value. (Fields with numeric values only.)", &addOperatorOptionHandler!MinOperator,
                "max",                "<field-list>[:STR]  Max value. (Fields with numeric values only.)", &addOperatorOptionHandler!MaxOperator,
                "range",              "<field-list>[:STR]  Difference between min and max values. (Fields with numeric values only.)", &addOperatorOptionHandler!RangeOperator,
                "sum",                "<field-list>[:STR]  Sum of the values. (Fields with numeric values only.)", &addOperatorOptionHandler!SumOperator,
                "mean",               "<field-list>[:STR]  Mean (average). (Fields with numeric values only.)", &addOperatorOptionHandler!MeanOperator,
                "median",             "<field-list>[:STR]  Median value. (Fields with numeric values only. Reads all values into memory.)", &addOperatorOptionHandler!MedianOperator,
                "quantile",           "<field-list>:p[,p...][:STR]  Quantiles. One or more fields, then one or more 0.0-1.0 probabilities. (Fields with numeric values only. Reads all values into memory.)", &addQuantileOperatorOptionHandler,
                "mad",                "<field-list>[:STR]  Median absolute deviation from the median. Raw value, not scaled. (Fields with numeric values only. Reads all values into memory.)", &addOperatorOptionHandler!MadOperator,
                "var",                "<field-list>[:STR]  Variance. (Sample variance, numeric fields only).", &addOperatorOptionHandler!VarianceOperator,
                "stdev",              "<field-list>[:STR]  Standard deviation. (Sample st.dev, numeric fields only).", &addOperatorOptionHandler!StDevOperator,
                "mode",               "<field-list>[:STR]  Mode. The most frequent value. (Reads all unique values into memory.)", &addOperatorOptionHandler!ModeOperator,
                "mode-count",         "<field-list>[:STR]  Count of the most frequent value. (Reads all unique values into memory.)", &addOperatorOptionHandler!ModeCountOperator,
                "unique-count",       "<field-list>[:STR]  Number of unique values. (Reads all unique values into memory.)", &addOperatorOptionHandler!UniqueCountOperator,
                "missing-count",      "<field-list>[:STR]  Number of missing (empty) fields. Not affected by '--x|exclude-missing' or '--r|replace-missing'.", &addOperatorOptionHandler!MissingCountOperator,
                "not-missing-count",  "<field-list>[:STR]  Number of filled (non-empty) fields. Not affected by '--r|replace-missing'.", &addOperatorOptionHandler!NotMissingCountOperator,
                "values",             "<field-list>[:STR]  All the values, separated by --v|values-delimiter. (Reads all values into memory.)", &addOperatorOptionHandler!ValuesOperator,
                "unique-values",      "<field-list>[:STR]  All the unique values, separated by --v|values-delimiter. (Reads all unique values into memory.)", &addOperatorOptionHandler!UniqueValuesOperator,
                );

            if (r.helpWanted)
            {
                defaultGetoptPrinter(helpText, r.options);
                return tuple(false, 0);
            }
            else if (helpVerbose)
            {
                defaultGetoptPrinter(helpTextVerbose, r.options);
                return tuple(false, 0);
            }
            else if (helpFields)
            {
                writeln(fieldListHelpText);
                return tuple(false, 0);
            }
            else if (versionWanted)
            {
                import tsv_utils.common.tsvutils_version;
                writeln(tsvutilsVersionNotice("tsv-summarize"));
                return tuple(false, 0);
            }

            /* Remaining command line args are files. Use standard input if files
             * were not provided. Truncate cmdArgs to consume the arguments.
             */
            string[] filepaths = (cmdArgs.length > 1) ? cmdArgs[1 .. $] : ["-"];
            cmdArgs.length = 1;

            /* Validation and derivations - Do as much validation prior to header line
             * processing as possible (avoids waiting on stdin).
             */

            enforce(!cmdLineOperatorOptions.empty, "At least one summary operator is required.");

            enforce(inputFieldDelimiter != valuesDelimiter,
                    "Cannot use the same character for both --d|field-delimiter and --v|values-delimiter.");

            enforce(!(excludeMissing && missingValueReplacement.length != 0),
                    "Cannot use both '--x|exclude-missing' and '--r|replace-missing'.");

            /* Missing field policy. */
            globalMissingPolicy.updatePolicy(excludeMissing, missingValueReplacement);

            string[] headerFields;

            /* fieldListArgProcessing encapsulates the field list processing. It is
             * called prior to reading the header line if headers are not being used,
             * and after if headers are being used.
             */
            void fieldListArgProcessing()
            {
                /* Run all the operator handlers. */
                cmdLineOtherFieldOptions.each!(dg => dg(hasHeader, headerFields));
                cmdLineOperatorOptions.each!(dg => dg(hasHeader, headerFields));

                /* keyFields need to be part of the endFieldIndex, which is one past
                 * the last field index. */
                keyFields.each!(delegate (size_t x)
                                {
                                    if (x >= endFieldIndex) endFieldIndex = x + 1;
                                } );
            }

            if (!hasHeader) fieldListArgProcessing();

            /*
             * Create the byLineSourceRange and perform header line processing.
             */
            inputSources = byLineSourceRange(filepaths);


            if (hasHeader)
            {
                if (!inputSources.front.byLine.empty)
                {
                    throwIfWindowsNewline(inputSources.front.byLine.front, inputSources.front.name, 1);
                    headerFields = inputSources.front.byLine.front.split(inputFieldDelimiter).to!(string[]);
                }

                fieldListArgProcessing();
            }
        }
        catch (Exception exc)
        {
            stderr.writefln("[%s] Error processing command line arguments: %s", programName, exc.msg);
            return tuple(false, 1);
        }
        return tuple(true, 0);
    }

    private void addGroupByOptionHandler(string option, string optionVal)
    {
        cmdLineOtherFieldOptions ~=
            (bool hasHeader, string[] headerFields)
            => groupByOptionHandler(hasHeader, headerFields, option, optionVal);
    }

    private void groupByOptionHandler(bool hasHeader, string[] headerFields, string option, string optionVal)
    {
        import tsv_utils.common.fieldlist;

        try
        {
            keyFields =
                optionVal
                .parseFieldList!(size_t, Yes.convertToZeroBasedIndex)(hasHeader, headerFields)
                .array;
        }
        catch (Exception e)
        {
            e.msg = format("[--%s %s]. %s", option, optionVal, e.msg);
            throw e;
        }
    }

    private void addOperatorOptionHandler(OperatorClass : SingleFieldOperator)(string option, string optionVal)
    {
        cmdLineOperatorOptions ~=
            (bool hasHeader, string[] headerFields)
            => operatorOptionHandler!OperatorClass(hasHeader, headerFields, option, optionVal);
    }

    /* operationOptionHandler functions are callbacks that process command line options
     * specifying summarization operations. eg. '--max 5', '--last 3:LastEntry'. Handlers
     * check syntactic correctness and instantiate Operator objects that do the work. This
     * is also where 1-upped field numbers are converted to 0-based indices.
     */
    private void operatorOptionHandler(OperatorClass : SingleFieldOperator)
    (bool hasHeader, string[] headerFields, string option, string optionVal)
    {
        import std.range : enumerate;
        import std.typecons : Yes, No;
        import tsv_utils.common.fieldlist;

        try
        {
            auto optionValParse =
                optionVal
                .parseFieldList!(size_t, Yes.convertToZeroBasedIndex, No.allowFieldNumZero, No.consumeEntireFieldListString)
                (hasHeader, headerFields);

            auto fieldIndices = optionValParse.array;
            bool hasOptionalHeader = optionVal.length > optionValParse.consumed;
            string optionalHeader;

            if (hasOptionalHeader)
            {
                enforce(fieldIndices.length <= 1, "Cannot specify a custom header when using multiple fields.");
                enforce(optionVal.length - optionValParse.consumed > 1,
                        format("No value after field list.\n   Expected: '--%s <field-list>' or '--%s <field>:<header>'.",
                               option, option));
                optionalHeader = optionVal[optionValParse.consumed + 1 .. $].idup;
            }

            foreach (fieldIndex; fieldIndices)
            {
                auto op = new OperatorClass(fieldIndex, globalMissingPolicy);

                if (hasOptionalHeader)
                {
                    enforce(op.allowCustomHeader, "Operator does not support custom headers.");
                    op.setCustomHeader(optionalHeader);
                }

                operators.insertBack(op);
                if (fieldIndex >= endFieldIndex) endFieldIndex = fieldIndex + 1;
            }
        }
        catch (Exception exc)
        {
            import std.format : format;
            exc.msg = format("[--%s %s] %s", option, optionVal, exc.msg);
            throw exc;
        }
    }

    private void addQuantileOperatorOptionHandler(string option, string optionVal)
    {
        cmdLineOperatorOptions ~=
            (bool hasHeader, string[] headerFields)
            => quantileOperatorOptionHandler(hasHeader, headerFields, option, optionVal);
    }

    /* QuantileOperator has a different syntax and needs a custom command option handler. */
    private void quantileOperatorOptionHandler(bool hasHeader, string[] headerFields, string option, string optionVal)
    {
        import std.typecons : Yes, No;
        import tsv_utils.common.fieldlist;

        try
        {
            auto optionValParse =
                optionVal
                .parseFieldList!(size_t, Yes.convertToZeroBasedIndex, No.allowFieldNumZero, No.consumeEntireFieldListString)
                (hasHeader, headerFields);

            auto fieldIndices = optionValParse.array;
            enforce(optionVal.length - optionValParse.consumed > 1, "No probabilities entered.");

            auto splitRemaining =
                optionVal[optionValParse.consumed + 1 .. $]
                .findSplit(":");

            enforce(splitRemaining[1].empty || !splitRemaining[2].empty,
                    "Empty custom header.");

            auto probStr = splitRemaining[0];
            auto header = splitRemaining[2];

            double[] probs;

            foreach (str; probStr.splitter(','))
            {
                double p = str.to!double;
                enforce(p >= 0.0 && p <= 1.0,
                        format("Probability '%g' is not in the interval [0.0,1.0].", p));
                probs ~= p;
            }

            enforce(header.empty || (fieldIndices.length <= 1 && probs.length <= 1),
                    format("Cannot specify a custom header when using multiple fields or multiple probabilities."));

            assert (fieldIndices.length > 0);
            assert (probs.length > 0);
            assert (header.empty || (fieldIndices.length == 1 && probs.length == 1));

            foreach (fieldIndex; fieldIndices)
            {
                foreach (p; probs)
                {
                    auto op = new QuantileOperator(fieldIndex, globalMissingPolicy, p);
                    if (!header.empty) op.setCustomHeader(header);
                    operators.insertBack(op);
                }
                if (fieldIndex >= endFieldIndex) endFieldIndex = fieldIndex + 1;
            }
        }
        catch (Exception e)
        {
            e.msg = format(
                "[--%s %s]. %s\n   Expected: '--%s <field-list>:<prob>[,<prob>]' or '--%s <field>:<prob>:<header>' where <prob> is a number between 0.0 and 1.0.",
                option, optionVal, e.msg, option, option);
            throw e;
        }

    }

    private void addCountOptionHandler()
    {
        cmdLineOperatorOptions ~=
            (bool hasHeader, string[] headerFields)
            => countOptionHandler(hasHeader, headerFields);
    }

    private void countOptionHandler(bool hasHeader, string[] headerFields)
    {
        operators.insertBack(new CountOperator());
    }

   private  void addCountHeaderOptionHandler(string option, string optionVal)
    {
        cmdLineOperatorOptions ~=
            (bool hasHeader, string[] headerFields)
            => countHeaderOptionHandler(hasHeader, headerFields, option, optionVal);
    }

    private void countHeaderOptionHandler(bool hasHeader, string[] headerFields, string option, string optionVal)
    {
        auto op = new CountOperator();
        op.setCustomHeader(optionVal);
        operators.insertBack(op);
    }
}

/** tsvSummarize does the primary work of the tsv-summarize program.
 */
void tsvSummarize(ref TsvSummarizeOptions cmdopt)
{
    import tsv_utils.common.utils : BufferedOutputRange, ByLineSourceRange,
        bufferedByLine, throwIfWindowsNewline;

    /* Check that the input files were setup as expected. Should at least have one
     * input, stdin if nothing else, and newlines removed from the byLine range.
     */
    assert(!cmdopt.inputSources.empty);
    static assert(is(typeof(cmdopt.inputSources) == ByLineSourceRange!(No.keepTerminator)));

    /* BufferedOutputRange is faster than writing directly to stdout if many lines are
     * being written. This will happen mostly when group-by is used.
     */
    auto bufferedOutput = BufferedOutputRange!(typeof(stdout))(stdout);

    /* Pick the Summarizer based on the number of key-fields entered. */
    auto summarizer =
        (cmdopt.keyFields.length == 0)
        ? new NoKeySummarizer!(typeof(bufferedOutput))(
            cmdopt.inputFieldDelimiter, cmdopt.globalMissingPolicy)

        : (cmdopt.keyFields.length == 1)
        ? new OneKeySummarizer!(typeof(bufferedOutput))(
            cmdopt.keyFields[0], cmdopt.inputFieldDelimiter, cmdopt.globalMissingPolicy)

        : new MultiKeySummarizer!(typeof(bufferedOutput))(
            cmdopt.keyFields, cmdopt.inputFieldDelimiter, cmdopt.globalMissingPolicy);

    /* Add the operators to the Summarizer. */
    summarizer.setOperators(inputRangeObject(cmdopt.operators[]));

    /* If there's no input header line, but writing an output header anyway, then
     * write it now. This helps tasks further on in a unix pipeline detect errors
     * quickly, without waiting for all the data to flow through the pipeline.
     */
    auto printOptions = SummarizerPrintOptions(
        cmdopt.inputFieldDelimiter, cmdopt.valuesDelimiter, cmdopt.floatPrecision);

    if (!cmdopt.hasHeader && cmdopt.writeHeader)
    {
        summarizer.writeSummaryHeader(bufferedOutput, printOptions);
        bufferedOutput.flush;
    }

    /* Process each input file, one line at a time. */
    auto lineFields = new char[][](cmdopt.endFieldIndex);
    bool headerFound = false;
    foreach (inputStream; cmdopt.inputSources)
    {
        foreach (lineNum, line; inputStream.byLine.enumerate(1))
        {
            if (lineNum == 1) throwIfWindowsNewline(line, inputStream.name, lineNum);

            /* Copy the needed number of fields to the fields array.
             * Note: The number is zero if no operator needs fields. Notably, the count
             * operator. Used by itself, it counts the number input lines (ala 'wc -l').
             */
            if (cmdopt.endFieldIndex > 0)
            {
                size_t fieldIndex = 0;
                foreach (fieldValue; line.splitter(cmdopt.inputFieldDelimiter))
                {
                    if (fieldIndex == cmdopt.endFieldIndex) break;
                    lineFields[fieldIndex] = fieldValue;
                    fieldIndex++;
                }

                if (fieldIndex == 0)
                {
                    assert(cmdopt.endFieldIndex > 0);
                    assert(line.length == 0);

                    /* Bug work-around. Empty lines are not handled properly by splitter.
                     *   - Bug: https://issues.dlang.org/show_bug.cgi?id=15735
                     *   - Pull Request: https://github.com/D-Programming-Language/phobos/pull/4030
                     * This can arise for: '$ tsv-summarize -g 1 --count'. This counts the
                     * unique values in field 1. If there's only one column, then an empty
                     * line becomes an empty string for field 1. Work-around: Point to the
                     * line. It's an empty string.
                     */
                    lineFields[fieldIndex] = line;
                    fieldIndex++;
                }

                enforce(fieldIndex >= cmdopt.endFieldIndex,
                        format("Not enough fields in line. File: %s, Line: %s",
                               inputStream.name, lineNum));
            }

            if (cmdopt.hasHeader && lineNum == 1)
            {
                if (!headerFound)
                {
                    summarizer.processHeaderLine(lineFields);
                    headerFound = true;

                    /* Write the header now. This helps tasks further on in a unix
                     * pipeline detect errors quickly, without waiting for all the
                     * data to flow through the pipeline. Note that an upstream task
                     * may have flushed its header line, so the header may arrive
                     * long before the main block of data.
                     */
                    summarizer.writeSummaryHeader(bufferedOutput, printOptions);
                    bufferedOutput.flush;
                }
            }
            else
            {
                /* Process the line. Processing will fail (throw) if a field cannot be
                 * converted to the expected type.
                 */
                try summarizer.processNextLine(lineFields);
                catch (Exception exc)
                {
                    throw new Exception(
                        format("Could not process line or field: %s\n  File: %s Line: %s%s",
                               exc.msg, inputStream.name, lineNum,
                               (lineNum == 1) ? "\n  Is this a header line? Use --header to skip." : ""));
                }
            }
        }
    }

    debug writeln("[tsvSummarize] After reading all data.");

    /* Whew! We're done processing input data. Run the calculations and print. */

    summarizer.writeSummaryBody(bufferedOutput, printOptions);
}

/** The default field header. This is used when the input doesn't have field headers,
 * but field headers are used in the output. The default is "fieldN", where N is the
 * 1-upped field number.
 */
string fieldHeaderFromIndex(size_t fieldIndex)
{
    enum prefix = "field";
    return prefix ~ (fieldIndex + 1).to!string;
}

unittest
{
    assert(fieldHeaderFromIndex(0) == "field1");
    assert(fieldHeaderFromIndex(10) == "field11");
}

/** Produce a summary header from a field header.
 *
 * The result has the form `<fieldHeader>_<operation>`. e.g. If the field header is
 * "length" and the operation is "max", the summary header is "length_max". The field
 * header typically comes a header line in the input data or was constructed by
 * fieldHeaderFromIndex().
 *
 * If operationName is the empty string, then fieldHeader is used unchanged. This supports
 * the Retain operator.
 */
string summaryHeaderFromFieldHeader(string fieldHeader, string operationName)
{
    return (operationName.length > 0) ? fieldHeader ~ "_" ~ operationName : fieldHeader;
}

unittest
{
    assert(summaryHeaderFromFieldHeader("originalfield", "mycalc") == "originalfield_mycalc");
    assert(summaryHeaderFromFieldHeader("originalfield", "") == "originalfield");
}

/** SummarizerPrintOptions holds printing options for Summarizers and Calculators. Typically
 * specified with command line options, it is separated out for modularity.
 */
struct SummarizerPrintOptions
{
    char fieldDelimiter;
    char valuesDelimiter;
    size_t floatPrecision = 12;

    import std.traits : isFloatingPoint, isIntegral;

    auto formatNumber(T)(T n) const
    if (isFloatingPoint!T || isIntegral!T)
    {
        import tsv_utils.common.numerics : formatNumber;
        return formatNumber!T(n, floatPrecision);
    }
}

/** A Summarizer object maintains the state of the summarization and performs basic
 * processing. Handling of files and input lines is left to the caller.
 *
 * Classes supporting the Summarizer must implement the methods:
 *  - setOperators - Called after initializing the object for each operator to be processed.
 *  - processHeaderLine - Called to process the header line of each file. Returns true if
 *   it was the first header line processed (used when reading multiple files).
 * - processNextLine - Called to process non-header lines.
 * - writeSummaryHeader - Called to write the header line.
 * - writeSummaryBody - Called to write the result lines.
 *
 */
interface Summarizer(OutputRange)
{
    /** Called after initializing the object for each operator to be processed. */
    void setOperators(InputRange!Operator op);

    /** Called to process the header line of each file. Returns true if it was the
     *  first header line processed (used when reading multiple files).
     */
    bool processHeaderLine(const char[][] lineFields);

    /** Called to process non-header lines. */
    void processNextLine(const char[][] lineFields);

    /** Called to write the header line. */
    void writeSummaryHeader(ref OutputRange outputStream, const ref SummarizerPrintOptions);

    /** Called to write the result lines. */
    void writeSummaryBody(ref OutputRange outputStream, const ref SummarizerPrintOptions);
}

/** SummarizerBase performs work shared by all sumarizers, most everything except for
 * handling of unique keys.
 *
 * The base class handles creation, allocates storage for Operators and SharedFieldValues,
 * and similar. Derived classes deal primarily with unique keys and the associated Calculators
 * and UniqueKeyValuesLists.
 */
class SummarizerBase(OutputRange) : Summarizer!OutputRange
{
    private char _inputFieldDelimiter;
    private bool _hasProcessedFirstHeaderLine = false;
    private SharedFieldValues _sharedFieldValues = null;  // Null if no shared field value lists.
    protected MissingFieldPolicy _missingPolicy;
    protected DList!Operator _operators;
    protected size_t _numOperators = 0;

    this(const char inputFieldDelimiter, MissingFieldPolicy missingPolicy)
    {
        _inputFieldDelimiter = inputFieldDelimiter;
        _missingPolicy = missingPolicy;
    }

    char inputFieldDelimiter() const @property
    {
        return _inputFieldDelimiter;
    }

    /** Sets the Operators used by the Summarizer. Called after construction. */
    void setOperators(InputRange!Operator operators)
    {
        foreach (op; operators)
        {
            _operators.insertBack(op);
            _numOperators++;
            auto numericFieldsToSave = op.numericFieldsToSave();
            auto textFieldsToSave = op.textFieldsToSave();

            if (numericFieldsToSave.length > 0 || textFieldsToSave.length > 0)
            {
                if (_sharedFieldValues is null)
                {
                    _sharedFieldValues = new SharedFieldValues();
                }
                numericFieldsToSave.each!(x => _sharedFieldValues.addNumericIndex(x));
                textFieldsToSave.each!(x => _sharedFieldValues.addTextIndex(x));
            }
        }
    }

    /** Called to process the header line of each file. Returns true if it was the
     *  first header line processed (used when reading multiple files).
     */
    bool processHeaderLine(const char[][] lineFields)
    {
        if (!_hasProcessedFirstHeaderLine)
        {
            _operators.each!(x => x.processHeaderLine(lineFields));
            _hasProcessedFirstHeaderLine = true;
            return true;
        }
        else
        {
            return false;
        }
    }

    protected final UniqueKeyValuesLists makeUniqueKeyValuesLists()
    {
        return (_sharedFieldValues is null)
            ? null
            : _sharedFieldValues.makeUniqueKeyValuesLists;
    }

    abstract void processNextLine(const char[][] lineFields);
    abstract void writeSummaryHeader(ref OutputRange outputStream, const ref SummarizerPrintOptions);
    abstract void writeSummaryBody(ref OutputRange outputStream, const ref SummarizerPrintOptions);
}

/** The NoKeySummarizer is used when summarizing values across the entire input.
 *
 * Note: NoKeySummarizer is used in Operator unit tests and gets extensive testing
 * through that mechanism.
 */
final class NoKeySummarizer(OutputRange) : SummarizerBase!OutputRange
{
    private Calculator[] _calculators;
    private UniqueKeyValuesLists _valueLists;

    this(const char inputFieldDelimiter, MissingFieldPolicy missingPolicy)
    {
        super(inputFieldDelimiter, missingPolicy);
    }

    /** Called after initializing the object for each operator to be processed. */
    override void setOperators(InputRange!Operator operators)
    {
        super.setOperators(operators);

        /* Only one Calculator per Operation, so create them as Operators are added. */
        foreach (op; operators) _calculators ~= op.makeCalculator;
        _valueLists = super.makeUniqueKeyValuesLists();
    }

     /** Called to process non-header lines. */
    override void processNextLine(const char[][] lineFields)
    {
        _calculators.each!(x => x.processNextLine(lineFields));
        if (_valueLists !is null) _valueLists.processNextLine(lineFields, _missingPolicy);
    }

    /** Called to write the header line. */
    override void writeSummaryHeader(ref OutputRange outputStream, const ref SummarizerPrintOptions printOptions)
    {
        put(outputStream, _operators[].map!(op => op.header).join(printOptions.fieldDelimiter));
        put(outputStream, '\n');
    }

    /** Called to write the result lines. */
    override void writeSummaryBody(ref OutputRange outputStream, const ref SummarizerPrintOptions printOptions)
    {
        put(outputStream,
            _calculators[]
            .map!(x => x.calculate(_valueLists, printOptions))
            .join(printOptions.fieldDelimiter));
        put(outputStream, '\n');
    }
}

/** KeySummarizerBase does work shared by the single key and multi-key summarizers.
 *
 * The primary difference between those two is the formation of the key. The primary
 * reason for separating those into two separate classes is to simplify (speed-up)
 * handling of single field keys, which are the most common use case.
 */
class KeySummarizerBase(OutputRange) : SummarizerBase!OutputRange
{
    protected struct UniqueKeyData
    {
        Calculator[] calculators;
        UniqueKeyValuesLists valuesLists;
    }

    private DList!string _uniqueKeys;
    private UniqueKeyData[string] _uniqueKeyData;

    this(const char inputFieldDelimiter, MissingFieldPolicy missingPolicy)
    {
        super(inputFieldDelimiter, missingPolicy);
    }

    protected void processNextLineWithKey(T : const char[])(T key, const char[][] lineFields)
    {
        debug writefln("[%s]: %s", __FUNCTION__, lineFields.to!string);

        auto dataPtr = (key in _uniqueKeyData);
        auto data = (dataPtr is null) ? addUniqueKey(key.to!string) : *dataPtr;

        data.calculators.each!(x => x.processNextLine(lineFields));
        if (data.valuesLists !is null) data.valuesLists.processNextLine(lineFields, _missingPolicy);
    }

    protected UniqueKeyData addUniqueKey(string key)
    {
        assert(key !in _uniqueKeyData);

        _uniqueKeys.insertBack(key);

        auto calculators = new Calculator[_numOperators];
        size_t i = 0;
        foreach (op; _operators)
        {
            calculators[i] = op.makeCalculator;
            i++;
        }

        return _uniqueKeyData[key] = UniqueKeyData(calculators, super.makeUniqueKeyValuesLists());
    }

    override void writeSummaryHeader(ref OutputRange outputStream, const ref SummarizerPrintOptions printOptions)
    {
        put(outputStream, keyFieldHeader());
        put(outputStream, printOptions.fieldDelimiter);
        put(outputStream, _operators[].map!(op => op.header).join(printOptions.fieldDelimiter));
        put(outputStream, '\n');
    }

    override void writeSummaryBody(ref OutputRange outputStream, const ref SummarizerPrintOptions printOptions)
    {
        foreach(key; _uniqueKeys)
        {
            auto data = _uniqueKeyData[key];
            put(outputStream, key);
            put(outputStream, printOptions.fieldDelimiter);
            put(outputStream,
                data.calculators[]
                .map!(x => x.calculate(data.valuesLists, printOptions))
                .join(printOptions.fieldDelimiter));
            put(outputStream, '\n');
        }
    }

    abstract string keyFieldHeader() const @property;
}

/** This Summarizer is for the case where the unique key is based on exactly one field.
 */
final class OneKeySummarizer(OutputRange) : KeySummarizerBase!OutputRange
{
    private size_t _keyFieldIndex = 0;
    private string _keyFieldHeader;
    private DList!string _uniqueKeys;

    this(size_t keyFieldIndex, char inputFieldDelimiter, MissingFieldPolicy missingPolicy)
    {
        super(inputFieldDelimiter, missingPolicy);
        _keyFieldIndex = keyFieldIndex;
        _keyFieldHeader = fieldHeaderFromIndex(keyFieldIndex);
    }

    override string keyFieldHeader() const @property
    {
        return _keyFieldHeader;
    }

    override bool processHeaderLine(const char[][] lineFields)
    {
        assert(_keyFieldIndex <= lineFields.length);

        bool isFirstHeaderLine = super.processHeaderLine(lineFields);
        if (isFirstHeaderLine)
        {
            _keyFieldHeader = lineFields[_keyFieldIndex].to!string;
        }
        return isFirstHeaderLine;
    }

    override void processNextLine(const char[][] lineFields)
    {
        assert(_keyFieldIndex < lineFields.length);
        processNextLineWithKey(lineFields[_keyFieldIndex], lineFields);
    }
}

/** This Summarizer is for the case where the unique key is based on multiple fields.
 */
final class MultiKeySummarizer(OutputRange) : KeySummarizerBase!OutputRange
{
    private size_t[] _keyFieldIndices;
    private string _keyFieldHeader;
    private DList!string _uniqueKeys;

    this(const size_t[] keyFieldIndices, char inputFieldDelimiter, MissingFieldPolicy missingPolicy)
    {
        super(inputFieldDelimiter, missingPolicy);
        _keyFieldIndices = keyFieldIndices.dup;
        _keyFieldHeader =
            _keyFieldIndices.map!(i => fieldHeaderFromIndex(i))
            .join(inputFieldDelimiter);
    }

    override string keyFieldHeader() const @property
    {
        return _keyFieldHeader;
    }

    override bool processHeaderLine(const char[][] lineFields)
    {
        assert(_keyFieldIndices.all!(x => x < lineFields.length));
        assert(_keyFieldIndices.length >= 2);

        bool isFirstHeaderLine = super.processHeaderLine(lineFields);
        if (isFirstHeaderLine)
        {
            _keyFieldHeader = _keyFieldIndices.map!(i => lineFields[i]).join(inputFieldDelimiter).to!string;
        }
        return isFirstHeaderLine;
    }

    override void processNextLine(const char[][] lineFields)
    {
        assert(_keyFieldIndices.all!(x => x < lineFields.length));
        assert(_keyFieldIndices.length >= 2);

        string key = _keyFieldIndices.map!(i => lineFields[i]).join(inputFieldDelimiter).to!string;
        processNextLineWithKey(key, lineFields);
    }
}

version(unittest)
{
    /* testSummarizer is a helper that can run many types of unit tests against
     * Summarizers. It can also test operators, but there are separate helper functions
     * better suited for that purpose.
     *
     * Arguments are a command line args, an input file, and expected output. The
     * input file and expected output are already split into lines and fields, the helper
     * manages re-assembly. The program name from the command line args is printed if an
     * an error occurs, it is useful to identify the test that failed.
     *
     * Note: Much of this is a duplication tsvSummarize logic. Better abstraction of
     * file input/output would enable running unit tests directly on top of tsvSummarize.
     *
     * Update (April 2020): With the introduction of InputSourceRange and ByLineSource,
     * there needs to be a physical file when call processArgs. Its hard to get around,
     * as the intent is to read the header line of the first input file during command
     * line argument processing. Eventually this unit test process will need to be
     * rewritten. For now, a file with the equivalent data is being added to the command
     * line.
     *
     * Update (Sept 2020): The physical file needs to be closed for unit tests on
     * Windows. This is so the temporary file can be deleted without trouble. Since its
     * a placeholder in these tests, it's getting iterated but not popped off the
     * inputSources and closed. Normal collection is not closing it quick enought. So
     * all inputSources are closed at the end of this function.
     */
    void testSummarizer(string[] cmdArgs, string[][] file, string[][] expected)
    {
        import std.array : appender;

        assert(cmdArgs.length > 0, "[testSummarizer] cmdArgs must not be empty.");

        auto formatAssertMessage(T...)(string msg, T formatArgs)
        {
            auto formatString = "[testSummarizer] %s: " ~ msg;
            return format(formatString, cmdArgs[0], formatArgs);
        }

        TsvSummarizeOptions cmdopt;
        auto savedCmdArgs = cmdArgs.to!string;
        auto r = cmdopt.processArgs(cmdArgs);
        assert(r[0], formatAssertMessage("Invalid command line args: '%s'.", savedCmdArgs));

        assert(file.all!(line => line.length >= cmdopt.endFieldIndex),
               formatAssertMessage("group-by or operator field number greater than number of fields a line of the input file."));

        /* Pick the Summarizer based on the number of key-fields entered. */
        auto summarizer =
            (cmdopt.keyFields.length == 0)
            ? new NoKeySummarizer!(typeof(appender!(char[])()))(
                cmdopt.inputFieldDelimiter, cmdopt.globalMissingPolicy)

            : (cmdopt.keyFields.length == 1)
            ? new OneKeySummarizer!(typeof(appender!(char[])()))(
                cmdopt.keyFields[0], cmdopt.inputFieldDelimiter, cmdopt.globalMissingPolicy)

            : new MultiKeySummarizer!(typeof(appender!(char[])()))(
                cmdopt.keyFields, cmdopt.inputFieldDelimiter, cmdopt.globalMissingPolicy);

        /* Add the operators to the Summarizer. */
        summarizer.setOperators(inputRangeObject(cmdopt.operators[]));

        /* Process the file one line at a time. */
        auto lineFields = new char[][](cmdopt.endFieldIndex);
        bool headerFound = false;
        foreach (lineNum, line; file.enumerate(1))
        {
            /* Copy the needed fields to the fields array. */
            foreach (i, val; line[0..cmdopt.endFieldIndex]) lineFields[i] = val.dup;

            if (cmdopt.hasHeader && lineNum == 1)
            {
                if (!headerFound)
                {
                    summarizer.processHeaderLine(lineFields);
                    headerFound = true;
                }
            }
            else
            {
                try summarizer.processNextLine(lineFields);
                catch (Exception exc)
                {
                    assert(false, formatAssertMessage(exc.msg));
                }
            }
        }
        auto printOptions = SummarizerPrintOptions(
        cmdopt.inputFieldDelimiter, cmdopt.valuesDelimiter, cmdopt.floatPrecision);

        auto summarizerOutput = appender!(char[])();

        if (cmdopt.hasHeader || cmdopt.writeHeader)
        {
            summarizer.writeSummaryHeader(summarizerOutput, printOptions);
        }

        summarizer.writeSummaryBody(summarizerOutput, printOptions);
        auto expectedOutput = expected.map!(x => x.joiner(cmdopt.inputFieldDelimiter.to!string)).joiner("\n").to!string;
        if (expectedOutput.length > 0 && expectedOutput[$-1] != '\n') expectedOutput ~= "\n";

        assert(summarizerOutput.data == expectedOutput,
               formatAssertMessage(
                   "Result != expected:\n=====Expected=====\n%s=====Actual=======\n%s==================",
                   expectedOutput.to!string, summarizerOutput.data.to!string));

        /* Ensure all files are closed by emptying the stack. */
        while (!cmdopt.inputSources.empty) cmdopt.inputSources.popFront;
    }

    void writeDataFile(string filepath, string[][] fileData, string delimiter = "\t")
    {
        import std.algorithm;
        import std.stdio;

        auto f = filepath.File("wb");
        foreach (record; fileData) f.writeln(record.joiner(delimiter));
        f.close;
    }
}

unittest
{
    import tsv_utils.common.unittest_utils;   // tsv unit test helpers, from common/src/.
    import std.file : mkdir, rmdirRecurse;
    import std.path : buildPath;

    auto testDir = makeUnittestTempDir("tsv_summarizer");
    scope(exit) testDir.rmdirRecurse;

    /* Summarizer unit tests. Primarily single-key and multi-key summarizers. To a limited
     * extent, command line option handling (TsvSummarizeOptions). Individual operators
     * have separate tests, those tests test the no-key summarizer. The Values operator is
     * used in these tests. It engages a number of behaviors, and the results have limited
     * ambiguity. Using only one operator limits dependence on individual operators.
     *
     * Update (April 2020): There now needs to be a real file passed to testSummarizer.
     * See the comments with testSummarizer for details.
     */

    auto file1 = [["fld1", "fld2", "fld3"],
                  ["a", "a",  "3"],
                  ["c", "a",  "2b"],
                  ["c", "bc", ""],
                  ["a", "c",  "2b"],
                  ["",  "bc", ""],
                  ["c", "bc", "3"]];

    auto file1Path = buildPath(testDir, "file1.tsv");
    auto file1NoHeaderPath = buildPath(testDir, "file1_noheader.tsv");
    writeDataFile(file1Path, file1);
    writeDataFile(file1NoHeaderPath, file1[1 .. $]);

    /* Single-key summarizer tests.
     */
    testSummarizer(["unittest-sk-1", "--header", "--group-by", "1", "--values", "1", file1Path],
                   file1,
                   [["fld1", "fld1_values"],
                    ["a", "a|a"],
                    ["c", "c|c|c"],
                    ["",  ""]]
        );
    testSummarizer(["unittest-sk-1-named", "--header", "--group-by", "fld1", "--values", "fld1", file1Path],
                   file1,
                   [["fld1", "fld1_values"],
                    ["a", "a|a"],
                    ["c", "c|c|c"],
                    ["",  ""]]
        );
    testSummarizer(["unittest-sk-2", "-H", "--group-by", "1", "--values", "2", file1Path],
                   file1,
                   [["fld1", "fld2_values"],
                    ["a", "a|c"],
                    ["c", "a|bc|bc"],
                    ["",  "bc"]]
        );
    testSummarizer(["unittest-sk-2-named", "-H", "--group-by", "fld1", "--values", "fld2", file1Path],
                   file1,
                   [["fld1", "fld2_values"],
                    ["a", "a|c"],
                    ["c", "a|bc|bc"],
                    ["",  "bc"]]
        );
    testSummarizer(["unittest-sk-3", "-H", "-g", "1", "--values", "3", file1Path],
                   file1,
                   [["fld1", "fld3_values"],
                    ["a", "3|2b"],
                    ["c", "2b||3"],
                    ["",  ""]]
        );
    testSummarizer(["unittest-sk-4", "-H", "--group-by", "1", "--values", "1,2,3", file1Path],
                   file1,
                   [["fld1", "fld1_values", "fld2_values", "fld3_values"],
                    ["a", "a|a",   "a|c",     "3|2b"],
                    ["c", "c|c|c", "a|bc|bc", "2b||3"],
                    ["",  "",      "bc",      ""]]
        );
    testSummarizer(["unittest-sk-4-named-a", "-H", "--group-by", "fld1", "--values", "fld1,fld2,fld3", file1Path],
                   file1,
                   [["fld1", "fld1_values", "fld2_values", "fld3_values"],
                    ["a", "a|a",   "a|c",     "3|2b"],
                    ["c", "c|c|c", "a|bc|bc", "2b||3"],
                    ["",  "",      "bc",      ""]]
        );
    testSummarizer(["unittest-sk-4-named-b", "-H", "--group-by", "fld1", "--values", "fld*", file1Path],
                   file1,
                   [["fld1", "fld1_values", "fld2_values", "fld3_values"],
                    ["a", "a|a",   "a|c",     "3|2b"],
                    ["c", "c|c|c", "a|bc|bc", "2b||3"],
                    ["",  "",      "bc",      ""]]
        );
    testSummarizer(["unittest-sk-5", "-H", "--group-by", "1", "--values", "1-3", file1Path],
                   file1,
                   [["fld1", "fld1_values", "fld2_values", "fld3_values"],
                    ["a", "a|a",   "a|c",     "3|2b"],
                    ["c", "c|c|c", "a|bc|bc", "2b||3"],
                    ["",  "",      "bc",      ""]]
        );
    testSummarizer(["unittest-sk-6", "-H", "--group-by", "1", "--values", "3,2,1", file1Path],
                   file1,
                   [["fld1", "fld3_values", "fld2_values", "fld1_values"],
                    ["a", "3|2b",  "a|c",     "a|a"],
                    ["c", "2b||3", "a|bc|bc", "c|c|c"],
                    ["",  "",      "bc",      ""]]
        );
    testSummarizer(["unittest-sk-7", "-H", "--group-by", "1", "--values", "3-1", file1Path],
                   file1,
                   [["fld1", "fld3_values", "fld2_values", "fld1_values"],
                    ["a", "3|2b",  "a|c",     "a|a"],
                    ["c", "2b||3", "a|bc|bc", "c|c|c"],
                    ["",  "",      "bc",      ""]]
        );
    testSummarizer(["unittest-sk-8", "-H", "--group-by", "2", "--values", "1", file1Path],
                   file1,
                   [["fld2", "fld1_values"],
                    ["a",  "a|c"],
                    ["bc", "c||c"],
                    ["c",  "a"]]
        );
    testSummarizer(["unittest-sk-9", "-H", "--group-by", "2", "--values", "2", file1Path],
                   file1,
                   [["fld2", "fld2_values"],
                    ["a",  "a|a"],
                    ["bc", "bc|bc|bc"],
                    ["c",  "c"]]
        );
    testSummarizer(["unittest-sk-10", "-H", "--group-by", "2", "--values", "3", file1Path],
                   file1,
                   [["fld2", "fld3_values"],
                    ["a",  "3|2b"],
                    ["bc", "||3"],
                    ["c",  "2b"]]
        );
    testSummarizer(["unittest-sk-11", "-H", "--group-by", "2", "--values", "1,3", file1Path],
                   file1,
                   [["fld2", "fld1_values", "fld3_values"],
                    ["a",  "a|c",  "3|2b"],
                    ["bc", "c||c", "||3"],
                    ["c",  "a",    "2b"]]
        );
    testSummarizer(["unittest-sk-12", "-H", "--group-by", "2", "--values", "3,1", file1Path],
                   file1,
                   [["fld2", "fld3_values", "fld1_values"],
                    ["a",  "3|2b", "a|c"],
                    ["bc", "||3",  "c||c"],
                    ["c",  "2b",   "a"]]
        );
    testSummarizer(["unittest-sk-13", "-H", "--group-by", "3", "--values", "1", file1Path],
                   file1,
                   [["fld3", "fld1_values"],
                    ["3",  "a|c"],
                    ["2b", "c|a"],
                    ["",   "c|"]]
        );
    testSummarizer(["unittest-sk-14", "-H", "--group-by", "3", "--values", "2", file1Path],
                   file1,
                   [["fld3", "fld2_values"],
                    ["3",  "a|bc"],
                    ["2b", "a|c"],
                    ["",   "bc|bc"]]
        );
    testSummarizer(["unittest-sk-15", "-H", "--group-by", "3", "--values", "1,2", file1Path],
                   file1,
                   [["fld3", "fld1_values", "fld2_values"],
                    ["3",  "a|c", "a|bc"],
                    ["2b", "c|a", "a|c"],
                    ["",   "c|",  "bc|bc"]]
        );
    testSummarizer(["unittest-sk-15-named", "-H", "--group-by", "fld3", "--values", "fld1,fld2", file1Path],
                   file1,
                   [["fld3", "fld1_values", "fld2_values"],
                    ["3",  "a|c", "a|bc"],
                    ["2b", "c|a", "a|c"],
                    ["",   "c|",  "bc|bc"]]
        );

    /* Multi-key summarizer tests.
     */
    testSummarizer(["unittest-mk-1", "--header", "--group-by", "1,2", "--values", "1", file1Path],
                   file1,
                   [["fld1", "fld2", "fld1_values"],
                    ["a", "a",  "a"],
                    ["c", "a",  "c"],
                    ["c", "bc", "c|c"],
                    ["a", "c",  "a"],
                    ["", "bc",  ""]]
        );
    testSummarizer(["unittest-mk-2", "-H", "--group-by", "1,2", "--values", "2", file1Path],
                   file1,
                   [["fld1", "fld2", "fld2_values"],
                    ["a", "a",  "a"],
                    ["c", "a",  "a"],
                    ["c", "bc", "bc|bc"],
                    ["a", "c",  "c"],
                    ["", "bc",  "bc"]]
        );
    testSummarizer(["unittest-mk-3", "-H", "--group-by", "1,2", "--values", "3", file1Path],
                   file1,
                   [["fld1", "fld2", "fld3_values"],
                    ["a", "a",  "3"],
                    ["c", "a",  "2b"],
                    ["c", "bc", "|3"],
                    ["a", "c",  "2b"],
                    ["", "bc",  ""]]
        );
    testSummarizer(["unittest-mk-4", "-H", "--group-by", "1,2", "--values", "3,1", file1Path],
                   file1,
                   [["fld1", "fld2", "fld3_values", "fld1_values"],
                    ["a", "a",  "3", "a"],
                    ["c", "a",  "2b", "c"],
                    ["c", "bc", "|3", "c|c"],
                    ["a", "c",  "2b", "a"],
                    ["",  "bc", "",   ""]]
        );
    testSummarizer(["unittest-mk-4-named", "-H", "--group-by", "fld1,fld2", "--values", "fld3,fld1", file1Path],
                   file1,
                   [["fld1", "fld2", "fld3_values", "fld1_values"],
                    ["a", "a",  "3", "a"],
                    ["c", "a",  "2b", "c"],
                    ["c", "bc", "|3", "c|c"],
                    ["a", "c",  "2b", "a"],
                    ["",  "bc", "",   ""]]
        );
    testSummarizer(["unittest-mk-5", "-H", "--group-by", "3,2", "--values", "1", file1Path],
                   file1,
                   [["fld3", "fld2", "fld1_values"],
                    ["3",  "a",  "a"],
                    ["2b", "a",  "c"],
                    ["",   "bc", "c|"],
                    ["2b", "c",  "a"],
                    ["3",  "bc", "c"]]
        );
    testSummarizer(["unittest-mk-6", "-H", "--group-by", "3-2", "--values", "1", file1Path],
                   file1,
                   [["fld3", "fld2", "fld1_values"],
                    ["3",  "a",  "a"],
                    ["2b", "a",  "c"],
                    ["",   "bc", "c|"],
                    ["2b", "c",  "a"],
                    ["3",  "bc", "c"]]
        );
    testSummarizer(["unittest-mk-7", "-H", "--group-by", "2,1,3", "--values", "2", file1Path],
                   file1,
                   [["fld2", "fld1", "fld3", "fld2_values"],
                    ["a",  "a", "3",  "a"],
                    ["a",  "c", "2b", "a"],
                    ["bc", "c", "",   "bc"],
                    ["c",  "a", "2b", "c"],
                    ["bc", "",  "",   "bc"],
                    ["bc", "c", "3",  "bc"]]
        );

    /* Missing policies. */
    testSummarizer(["unittest-mis-1", "--header", "--group-by", "1", "--values", "1", "--exclude-missing", file1Path],
                   file1,
                   [["fld1", "fld1_values"],
                    ["a", "a|a"],
                    ["c", "c|c|c"],
                    ["",  ""]]
        );
    testSummarizer(["unittest-mis-2", "-H", "--group-by", "1", "--values", "2", "-x", file1Path],
                   file1,
                   [["fld1", "fld2_values"],
                    ["a", "a|c"],
                    ["c", "a|bc|bc"],
                    ["",  "bc"]]
        );
    testSummarizer(["unittest-mis-3", "-H", "-g", "1", "--values", "3", "-x", file1Path],
                   file1,
                   [["fld1", "fld3_values"],
                    ["a", "3|2b"],
                    ["c", "2b|3"],
                    ["",  ""]]
        );
    testSummarizer(["unittest-mis-4", "-H", "--group-by", "1", "--values", "1,2,3", "-x", file1Path],
                   file1,
                   [["fld1", "fld1_values", "fld2_values", "fld3_values"],
                    ["a", "a|a",   "a|c",     "3|2b"],
                    ["c", "c|c|c", "a|bc|bc", "2b|3"],
                    ["",  "",      "bc",      ""]]
        );
    testSummarizer(["unittest-mis-5", "--header", "--group-by", "1", "--values", "1", "--replace-missing", "NA", file1Path],
                   file1,
                   [["fld1", "fld1_values"],
                    ["a", "a|a"],
                    ["c", "c|c|c"],
                    ["",  "NA"]]
        );
    testSummarizer(["unittest-mis-6", "-H", "--group-by", "1", "--values", "2", "-r", "NA", file1Path],
                   file1,
                   [["fld1", "fld2_values"],
                    ["a", "a|c"],
                    ["c", "a|bc|bc"],
                    ["",  "bc"]]
        );
    testSummarizer(["unittest-mis-7", "-H", "-g", "1", "--values", "3", "-r", "NA", file1Path],
                   file1,
                   [["fld1", "fld3_values"],
                    ["a", "3|2b"],
                    ["c", "2b|NA|3"],
                    ["",  "NA"]]
        );
    testSummarizer(["unittest-mis-7-named", "-H", "-g", "fld1", "--values", "fld3", "-r", "NA", file1Path],
                   file1,
                   [["fld1", "fld3_values"],
                    ["a", "3|2b"],
                    ["c", "2b|NA|3"],
                    ["",  "NA"]]
        );
    testSummarizer(["unittest-mis-8", "-H", "--group-by", "1", "--values", "1,2,3", "-r", "NA", file1Path],
                   file1,
                   [["fld1", "fld1_values", "fld2_values", "fld3_values"],
                    ["a", "a|a",   "a|c",     "3|2b"],
                    ["c", "c|c|c", "a|bc|bc", "2b|NA|3"],
                    ["",  "NA",      "bc",      "NA"]]
        );
    testSummarizer(["unittest-mis-9", "-H", "--group-by", "1,2", "--values", "3,1", "-x", file1Path],
                   file1,
                   [["fld1", "fld2", "fld3_values", "fld1_values"],
                    ["a", "a",  "3", "a"],
                    ["c", "a",  "2b", "c"],
                    ["c", "bc", "3", "c|c"],
                    ["a", "c",  "2b", "a"],
                    ["",  "bc", "",   ""]]
        );
    testSummarizer(["unittest-mis-10", "-H", "--group-by", "3,2", "--values", "1", "-x", file1Path],
                   file1,
                   [["fld3", "fld2", "fld1_values"],
                    ["3",  "a",  "a"],
                    ["2b", "a",  "c"],
                    ["",   "bc", "c"],
                    ["2b", "c",  "a"],
                    ["3",  "bc", "c"]]
        );
    testSummarizer(["unittest-mis-11", "-H", "--group-by", "2,1,3", "--values", "2", "-x", file1Path],
                   file1,
                   [["fld2", "fld1", "fld3", "fld2_values"],
                    ["a",  "a", "3",  "a"],
                    ["a",  "c", "2b", "a"],
                    ["bc", "c", "",   "bc"],
                    ["c",  "a", "2b", "c"],
                    ["bc", "",  "",   "bc"],
                    ["bc", "c", "3",  "bc"]]
        );
    testSummarizer(["unittest-mis-12", "-H", "--group-by", "1,2", "--values", "3,1", "-r", "NA", file1Path],
                   file1,
                   [["fld1", "fld2", "fld3_values", "fld1_values"],
                    ["a", "a",  "3", "a"],
                    ["c", "a",  "2b", "c"],
                    ["c", "bc", "NA|3", "c|c"],
                    ["a", "c",  "2b", "a"],
                    ["",  "bc", "NA",   "NA"]]
        );
    testSummarizer(["unittest-mis-13", "-H", "--group-by", "3,2", "--values", "1", "-r", "NA", file1Path],
                   file1,
                   [["fld3", "fld2", "fld1_values"],
                    ["3",  "a",  "a"],
                    ["2b", "a",  "c"],
                    ["",   "bc", "c|NA"],
                    ["2b", "c",  "a"],
                    ["3",  "bc", "c"]]
        );
    testSummarizer(["unittest-mis-14", "-H", "--group-by", "2,1,3", "--values", "2", "-r", "NA", file1Path],
                   file1,
                   [["fld2", "fld1", "fld3", "fld2_values"],
                    ["a",  "a", "3",  "a"],
                    ["a",  "c", "2b", "a"],
                    ["bc", "c", "",   "bc"],
                    ["c",  "a", "2b", "c"],
                    ["bc", "",  "",   "bc"],
                    ["bc", "c", "3",  "bc"]]
        );

    /* Validate that the no-key summarizer works with testSummarizer helper function.
     */
    testSummarizer(["unittest-nk-1", "-H", "--values", "1,2", file1Path],
                   file1,
                   [["fld1_values", "fld2_values"],
                    ["a|c|c|a||c", "a|a|bc|c|bc|bc"]]
        );
    testSummarizer(["unittest-nk-1-named", "-H", "--values", "fld1,fld2", file1Path],
                   file1,
                   [["fld1_values", "fld2_values"],
                    ["a|c|c|a||c", "a|a|bc|c|bc|bc"]]
        );

    /* Header variations: no header line; auto-generated header line; custom headers.
     */
    testSummarizer(["unittest-hdr-1", "--group-by", "1", "--values", "1", file1NoHeaderPath],
                   file1[1..$],
                   [["a", "a|a"],
                    ["c", "c|c|c"],
                    ["",  ""]]
        );
    testSummarizer(["unittest-hdr-2", "--group-by", "1,2", "--values", "2", file1NoHeaderPath],
                   file1[1..$],
                   [["a", "a",  "a"],
                    ["c", "a",  "a"],
                    ["c", "bc", "bc|bc"],
                    ["a", "c",  "c"],
                    ["", "bc",  "bc"]]
        );
    testSummarizer(["unittest-hdr-3", "--write-header", "--group-by", "2", "--values", "1", file1NoHeaderPath],
                   file1[1..$],
                   [["field2", "field1_values"],
                    ["a",  "a|c"],
                    ["bc", "c||c"],
                    ["c",  "a"]]
        );
    testSummarizer(["unittest-hdr-4", "-w", "--group-by", "3,2", "--values", "1", file1NoHeaderPath],
                   file1[1..$],
                   [["field3", "field2", "field1_values"],
                    ["3",  "a",  "a"],
                    ["2b", "a",  "c"],
                    ["",   "bc", "c|"],
                    ["2b", "c",  "a"],
                    ["3",  "bc", "c"]]
        );
    testSummarizer(["unittest-hdr-5", "-H", "--group-by", "2", "--values", "3:Field3Values", file1Path],
                   file1,
                   [["fld2", "Field3Values"],
                    ["a",  "3|2b"],
                    ["bc", "||3"],
                    ["c",  "2b"]]
        );
    testSummarizer(["unittest-hdr-6", "-H", "--group-by", "1,2", "--values", "3:FieldThreeValues", "--values", "1:FieldOneValues", file1Path],
                   file1,
                   [["fld1", "fld2", "FieldThreeValues", "FieldOneValues"],
                    ["a", "a",  "3", "a"],
                    ["c", "a",  "2b", "c"],
                    ["c", "bc", "|3", "c|c"],
                    ["a", "c",  "2b", "a"],
                    ["",  "bc", "",   ""]]
        );
    testSummarizer(["unittest-hdr-6-named-a", "-H", "--group-by", "fld1,fld2", "--values", "fld3:FieldThreeValues", "--values", "fld1:FieldOneValues", file1Path],
                   file1,
                   [["fld1", "fld2", "FieldThreeValues", "FieldOneValues"],
                    ["a", "a",  "3", "a"],
                    ["c", "a",  "2b", "c"],
                    ["c", "bc", "|3", "c|c"],
                    ["a", "c",  "2b", "a"],
                    ["",  "bc", "",   ""]]
        );
    testSummarizer(["unittest-hdr-6-named-b", "-H", "--group-by", "fld1,fld2", "--values", "fld3 FieldThreeValues", "--values", "fld1 FieldOneValues", file1Path],
                   file1,
                   [["fld1", "fld2", "FieldThreeValues", "FieldOneValues"],
                    ["a", "a",  "3", "a"],
                    ["c", "a",  "2b", "c"],
                    ["c", "bc", "|3", "c|c"],
                    ["a", "c",  "2b", "a"],
                    ["",  "bc", "",   ""]]
        );
    testSummarizer(["unittest-hdr-7", "--write-header", "--group-by", "1", "--values", "3:f3_vals","--values", "2:f2_vals", "--values", "1:f1_vals", file1NoHeaderPath],
                   file1[1..$],
                   [["field1", "f3_vals", "f2_vals", "f1_vals"],
                    ["a", "3|2b",  "a|c",     "a|a"],
                    ["c", "2b||3", "a|bc|bc", "c|c|c"],
                    ["",  "",      "bc",      ""]]
        );
    testSummarizer(["unittest-hdr-8", "--write-header", "--group-by", "1,3,2", "--values", "3", "--values", "1:ValsField1", "--values", "2:ValsField2", file1NoHeaderPath],
                   file1[1..$],
                   [["field1", "field3", "field2", "field3_values", "ValsField1", "ValsField2"],
                    ["a", "3",  "a",  "3",  "a", "a"],
                    ["c", "2b", "a",  "2b", "c", "a"],
                    ["c", "",   "bc", "",   "c", "bc"],
                    ["a", "2b", "c",  "2b", "a", "c"],
                    ["",  "",   "bc", "",   "",  "bc"],
                    ["c", "3",  "bc", "3",  "c", "bc"]]
        );
    testSummarizer(["unittest-hdr-9", "--write-header", "--group-by", "1,3-2", "--values", "3", "--values", "1:ValsField1", "--values", "2:ValsField2", file1NoHeaderPath],
                   file1[1..$],
                   [["field1", "field3", "field2", "field3_values", "ValsField1", "ValsField2"],
                    ["a", "3",  "a",  "3",  "a", "a"],
                    ["c", "2b", "a",  "2b", "c", "a"],
                    ["c", "",   "bc", "",   "c", "bc"],
                    ["a", "2b", "c",  "2b", "a", "c"],
                    ["",  "",   "bc", "",   "",  "bc"],
                    ["c", "3",  "bc", "3",  "c", "bc"]]
        );

    /* Alternate file widths and lengths.
     */

    auto file3x2 = [["fld1", "fld2", "fld3"],
                    ["a", "b", "c"],
                    ["c", "b", "a"]];

    auto file3x2Path = buildPath(testDir, "file3x2.tsv");
    auto file3x2NoHeaderPath = buildPath(testDir, "file3x2_noheader.tsv");
    writeDataFile(file3x2Path, file3x2);
    writeDataFile(file3x2NoHeaderPath, file3x2[1 .. $]);

    testSummarizer(["unittest-3x2-1", "-H", "--group-by", "1", "--values", "3", file3x2Path],
                   file3x2,
                   [["fld1", "fld3_values"],
                    ["a", "c"],
                    ["c", "a"]]
        );
    testSummarizer(["unittest-3x2-2", "-H", "--group-by", "2", "--values", "3", file3x2Path],
                   file3x2,
                   [["fld2", "fld3_values"],
                    ["b", "c|a"]]
        );
    testSummarizer(["unittest-3x2-3", "-H", "--group-by", "2,1", "--values", "3", file3x2Path],
                   file3x2,
                   [["fld2", "fld1", "fld3_values"],
                    ["b", "a", "c"],
                    ["b", "c", "a"]]
        );

    auto file3x1 = [["fld1", "fld2", "fld3"],
                    ["a", "b", "c"]];

    auto file3x1Path = buildPath(testDir, "file3x1.tsv");
    auto file3x1NoHeaderPath = buildPath(testDir, "file3x1_noheader.tsv");
    writeDataFile(file3x1Path, file3x1);
    writeDataFile(file3x1NoHeaderPath, file3x1[1 .. $]);

    testSummarizer(["unittest-3x1-1", "-H", "--group-by", "1", "--values", "3", file3x1Path],
                   file3x1,
                   [["fld1", "fld3_values"],
                    ["a", "c"]]
        );
    testSummarizer(["unittest-3x1-2", "--group-by", "1", "--values", "3", file3x1NoHeaderPath],
                   file3x1[1..$],
                   [["a", "c"]]
        );
    testSummarizer(["unittest-3x1-3", "-H", "--group-by", "2,1", "--values", "3", file3x1Path],
                   file3x1,
                   [["fld2", "fld1", "fld3_values"],
                    ["b", "a", "c"]]
        );
    testSummarizer(["unittest-3x1-3-named", "-H", "--group-by", "fld2,fld1", "--values", "fld3", file3x1Path],
                   file3x1,
                   [["fld2", "fld1", "fld3_values"],
                    ["b", "a", "c"]]
        );
    testSummarizer(["unittest-3x1-4", "--group-by", "2,1", "--values", "3", file3x1NoHeaderPath],
                   file3x1[1..$],
                   [["b", "a", "c"]]
        );

    auto file3x0 = [["fld1", "fld2", "fld3"]];

    auto file3x0Path = buildPath(testDir, "file3x0.tsv");
    auto file3x0NoHeaderPath = buildPath(testDir, "file3x0_noheader.tsv");
    writeDataFile(file3x0Path, file3x0);
    writeDataFile(file3x0NoHeaderPath, file3x0[1 .. $]);


    testSummarizer(["unittest-3x0-1", "-H", "--group-by", "1", "--values", "3", file3x0Path],
                   file3x0,
                   [["fld1", "fld3_values"]]
        );
    testSummarizer(["unittest-3x0-1-named", "-H", "--group-by", "fld1", "--values", "fld3", file3x0Path],
                   file3x0,
                   [["fld1", "fld3_values"]]
        );
    testSummarizer(["unittest-3x0-2", "--group-by", "1", "--values", "3", file3x0NoHeaderPath],
                   file3x0[1..$],
                   []
        );
    testSummarizer(["unittest-3x0-3", "--write-header", "--group-by", "1", "--values", "3", file3x0NoHeaderPath],
                   file3x0[1..$],
                   [["field1", "field3_values"]]
        );


    testSummarizer(["unittest-3x0-4", "-H", "--group-by", "2,1", "--values", "3", file3x0Path],
                   file3x0,
                   [["fld2", "fld1", "fld3_values"]]
        );

    testSummarizer(["unittest-3x0-5", "--group-by", "2,1", "--values", "3", file3x0NoHeaderPath],
                   file3x0[1..$],
                   []
        );

    testSummarizer(["unittest-3x0-6", "--write-header", "--group-by", "2,1", "--values", "3", file3x0NoHeaderPath],
                   file3x0[1..$],
                   [["field2", "field1", "field3_values"]]
        );

    auto file2x1 = [["fld1", "fld2"],
                    ["a", "b"]];

    auto file2x1Path = buildPath(testDir, "file2x1.tsv");
    auto file2x1NoHeaderPath = buildPath(testDir, "file2x1_noheader.tsv");
    writeDataFile(file2x1Path, file2x1);
    writeDataFile(file2x1NoHeaderPath, file2x1[1 .. $]);

    testSummarizer(["unittest-2x1-1", "-H", "--group-by", "1", "--values", "2", file2x1Path],
                   file2x1,
                   [["fld1", "fld2_values"],
                    ["a", "b"]]
        );
    testSummarizer(["unittest-2x1-2", "-H", "--group-by", "2,1", "--values", "1", file2x1Path],
                   file2x1,
                   [["fld2", "fld1", "fld1_values"],
                    ["b", "a", "a"]]
        );

    auto file2x0 = [["fld1", "fld2"]];

    auto file2x0Path = buildPath(testDir, "file2x0.tsv");
    auto file2x0NoHeaderPath = buildPath(testDir, "file2x0_noheader.tsv");
    writeDataFile(file2x0Path, file2x0);
    writeDataFile(file2x0NoHeaderPath, file2x0[1 .. $]);

    testSummarizer(["unittest-2x0-1", "-H", "--group-by", "1", "--values", "2", file2x0Path],
                   file2x0,
                   [["fld1", "fld2_values"]]
        );
    testSummarizer(["unittest-2x0-2", "-H", "--group-by", "2,1", "--values", "1", file2x0Path],
                   file2x0,
                   [["fld2", "fld1", "fld1_values"]]
        );

    auto file1x2 = [["fld1"],
                    ["a"],
                    [""]];

    auto file1x2Path = buildPath(testDir, "file1x2.tsv");
    auto file1x2NoHeaderPath = buildPath(testDir, "file1x2_noheader.tsv");
    writeDataFile(file1x2Path, file1x2);
    writeDataFile(file1x2NoHeaderPath, file1x2[1 .. $]);

    testSummarizer(["unittest-1x2-1", "-H", "--group-by", "1", "--values", "1", file1x2Path],
                   file1x2,
                   [["fld1", "fld1_values"],
                    ["a", "a"],
                    ["",  ""]]
        );

    auto file1x2b = [["fld1"],
                     [""],
                     [""]];

    auto file1x2bPath = buildPath(testDir, "file1x2b.tsv");
    auto file1x2bNoHeaderPath = buildPath(testDir, "file1x2b_noheader.tsv");
    writeDataFile(file1x2bPath, file1x2b);
    writeDataFile(file1x2bNoHeaderPath, file1x2b[1 .. $]);

    testSummarizer(["unittest-1x2b-2", "-H", "--group-by", "1", "--values", "1", file1x2bPath],
                   file1x2b,
                   [["fld1", "fld1_values"],
                    ["", "|"]]
        );

    auto file1x1 = [["fld1"],
                    ["x"]];

    auto file1x1Path = buildPath(testDir, "file1x1.tsv");
    auto file1x1NoHeaderPath = buildPath(testDir, "file1x1_noheader.tsv");
    writeDataFile(file1x1Path, file1x1);
    writeDataFile(file1x1NoHeaderPath, file1x1[1 .. $]);

    testSummarizer(["unittest-1x1-1", "-H", "--group-by", "1", "--values", "1", file1x1Path],
                   file1x1,
                   [["fld1", "fld1_values"],
                    ["x", "x"]]
        );
    testSummarizer(["unittest-1x1-1-named", "-H", "--group-by", "fld1", "--values", "fld1", file1x1Path],
                   file1x1,
                   [["fld1", "fld1_values"],
                    ["x", "x"]]
        );

    testSummarizer(["unittest-1x1-2", "--group-by", "1", "--values", "1", file1x1NoHeaderPath],
                   file1x1[1..$],
                   [["x", "x"]]
        );

    testSummarizer(["unittest-1x1-3", "--write-header", "--group-by", "1", "--values", "1", file1x1NoHeaderPath],
                   file1x1[1..$],
                   [["field1", "field1_values"],
                    ["x", "x"]]
        );

    auto file1x1b = [["fld1"],
                    [""]];

    auto file1x1bPath = buildPath(testDir, "file1x1b.tsv");
    auto file1x1bNoHeaderPath = buildPath(testDir, "file1x1b_noheader.tsv");
    writeDataFile(file1x1bPath, file1x1b);
    writeDataFile(file1x1bNoHeaderPath, file1x1b[1 .. $]);

    testSummarizer(["unittest-1x1b-1", "-H", "--group-by", "1", "--values", "1", file1x1bPath],
                   file1x1b,
                   [["fld1", "fld1_values"],
                    ["", ""]]
        );

    auto file1x0 = [["fld1"]];

    auto file1x0Path = buildPath(testDir, "file1x0.tsv");
    auto file1x0NoHeaderPath = buildPath(testDir, "file1x0_noheader.tsv");
    writeDataFile(file1x0Path, file1x0);
    writeDataFile(file1x0NoHeaderPath, file1x0[1 .. $]);

    testSummarizer(["unittest-1x0-1", "-H", "--group-by", "1", "--values", "1", file1x0Path],
                   file1x0,
                   [["fld1", "fld1_values"]]
        );

    testSummarizer(["unittest-1x0-2", "--group-by", "1", "--values", "1", file1x0NoHeaderPath],
                   file1x0[1..$],
                   []
        );

    testSummarizer(["unittest-1x0-3", "--write-header", "--group-by", "1", "--values", "1", file1x0NoHeaderPath],
                   file1x0[1..$],
                   [["field1", "field1_values"]]
        );

    /* Alternate delimiters.
     *
     * Note: In current unit test setup the data is already in memory (file1).
     * 'file1Path' points to a file with equivalent data, but not read, except if
     * processing the header line. A data file is created for the '%' and '#'
     * delimiter cases (these read the header), but we don't bother for the others.
     */
    auto file1PctDelimPath = buildPath(testDir, "file1PctDelim.tsv");
    auto file1HashDelimPath = buildPath(testDir, "file1HashDelim.tsv");
    writeDataFile(file1PctDelimPath, file1, "%");
    writeDataFile(file1HashDelimPath, file1, "#");

    testSummarizer(["unittest-delim-1", "-H", "--values", "1,2", "--delimiter", "%", file1PctDelimPath],
                   file1,
                   [["fld1_values", "fld2_values"],
                    ["a|c|c|a||c", "a|a|bc|c|bc|bc"]]
        );
    testSummarizer(["unittest-delim-1-named", "-H", "--values", "fld1,fld2", "--delimiter", "%", file1PctDelimPath],
                   file1,
                   [["fld1_values", "fld2_values"],
                    ["a|c|c|a||c", "a|a|bc|c|bc|bc"]]
        );
    testSummarizer(["unittest-delim-2", "-H", "--values", "1-2", "--values-delimiter", "$", file1Path],
                   file1,
                   [["fld1_values", "fld2_values"],
                    ["a$c$c$a$$c", "a$a$bc$c$bc$bc"]]
        );
    testSummarizer(["unittest-delim-3", "-H", "--values", "1,2", "--delimiter", "#", "--values-delimiter", ",", file1HashDelimPath],
                   file1,
                   [["fld1_values", "fld2_values"],
                    ["a,c,c,a,,c", "a,a,bc,c,bc,bc"]]
        );
    testSummarizer(["unittest-delim-3-named", "-H", "--values", "fld1,fld2", "--delimiter", "#", "--values-delimiter", ",", file1HashDelimPath],
                   file1,
                   [["fld1_values", "fld2_values"],
                    ["a,c,c,a,,c", "a,a,bc,c,bc,bc"]]
        );
    testSummarizer(["unittest-delim-4", "--write-header", "--group-by", "2", "--values", "1",
                    "--delimiter", "^", "--values-delimiter", ":", file1NoHeaderPath],
                   file1[1..$],
                   [["field2", "field1_values"],
                    ["a",  "a:c"],
                    ["bc", "c::c"],
                    ["c",  "a"]]
        );
    testSummarizer(["unittest-delim-5", "--group-by", "1,2", "--values", "2", "--delimiter", "/",
                    "--values-delimiter", "\\", file1NoHeaderPath],
                   file1[1..$],
                   [["a", "a",  "a"],
                    ["c", "a",  "a"],
                    ["c", "bc", "bc\\bc"],
                    ["a", "c",  "c"],
                    ["", "bc",  "bc"]]
        );
}

/* Summary Operators and Calculators
 *
 * Two types of objects are used in implementation: Operators and Calculators. An Operator
 * represents a summary calculation specified on the command line, e.g. '--mean 5'. A
 * Calculator is used to manage the summary calculation for each unique key in the input.
 *
 * As an example, consider the command:
 *
 *    $tsv-summarize --group-by 1 --mean 3 --mean 5
 *
 * This command will create two instances of a MeanOperator, one each for fields 3 and 5.
 * They produce the output field headers (e.g. "field3_mean", "field5_mean"). They also
 * create MeanCalculator objects for each unique value in field 1. For 'mean', a
 * calculator needs to track occurrence count and sum. Calculators produce the final
 * value when all processing is finished.
 *
 * Summary field headers
 *
 * There are several options for specifying summary field headers. The defaults combine the
 * operator name and the header of the field summarized. The defaults can be overridden on
 * on the command line. These scenarios are supported via the operator constructor and the
 * processHeaderLine() method.
 *
 * Missing field policy
 *
 * At present, tsv-summarize has a single policy for handling missing values that applies
 * to all operators. However, it is logically operator specific and is implemented that
 * way. The MissingFieldPolicy struct describes the policy, each operator contains one.
 * Calculators access thier operator's policy struct.
 */

/** An Operator represents a summary calculation specified on the command line.
 *  e.g. '--mean 5'.
 */
interface Operator
{
    @property string header();
    @property string name();
    void processHeaderLine(const char[][] fields);
    size_t[] numericFieldsToSave();     // Numeric fields this Operator needs saved
    size_t[] textFieldsToSave();        // Text fields this Operator needs saved
    Calculator makeCalculator();
}

/** Calculators are responsible for the calculation of a single computation. They
 *  process each line and produce the final value when all processing is finished.
 */
interface Calculator
{
    void processNextLine(const char[][] fields);
    string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions);
}

/** This class describes processing behavior when a missing value is encountered.
 */
final class MissingFieldPolicy
{
    private bool _useMissing = true;          // True if missing values are processed unchanged.
    private bool _replaceMissing = false;     // True if missing values are replaced.
    private string _missingReplacement;       // Replacement string if replaceMissing is true.

    this (const bool excludeMissing = false, string missingReplacement = "")
    {
        updatePolicy(excludeMissing, missingReplacement);
    }

    void updatePolicy(const bool excludeMissing, string missingReplacement)
    {
        _missingReplacement = missingReplacement;
        _replaceMissing = missingReplacement.length != 0;
        _useMissing = !excludeMissing && !replaceMissing;
    }

    final bool isMissingField(const char[] field) const
    {
        return field.length == 0;
    }

    final bool useMissing() const @property
    {
        return _useMissing;
    }

    final bool excludeMissing() const @property
    {
        return !_useMissing && !_replaceMissing;
    }

    final bool replaceMissing() const @property
    {
        return _replaceMissing;
    }

    final string missingReplacement() const @property
    {
        return _missingReplacement;
    }
}

/* The SharedFieldValues and UniqueKeyValuesLists classes manage lists of values collected
 * while reading data. Operations like median collect all values and operate on them when
 * running the final calculation. Value lists are needed for each unique key. A command
 * using multiple Operators may save multiple fields. And, different Operators may be run
 * against the same field.
 *
 * The last part motivates these classes. Handling large data sets necessitates minimizing
 * in-memory storage, making it desirable to share identical lists between Calculators.
 * Otherwise, each Calculator could implement its own storage, which would be simpler.
 *
 * The setup works as follows:
 *  - Operators advertise fields they need saved ([text|numeric]FieldsToSave methods).
 *  - The SummarizerBase object keeps a SharedFieldValues object, which in turn keeps list
 *    of the fields advertised by Operators as needing sharing. This list gets created
 *    during command initialization (SummarizerBase.setOperators).
 *  - The SharedFieldValues object is used to create a UniqueKeyValuesLists object every
 *    time a new unique key is found, in parellel to the Calculator objects created for the
 *    key. The UniqueKeyValuesLists objects are managed by derived Summarizer classes.
 *  - A unique key's UniqueKeyValuesLists object is passed each input line, same as
 *    Calculators, saving the values.
 *  - Calculators retrieve the saved values during the calculation phase. The calculator's
 *    ProcessNextField method is typically a no-op.
 *  - Calculators cannot make assumptions about the order of the saved values. This is
 *    pragmatic concession to median and quantile calculations, which need to sort the data,
 *    at least partially. Rather than generate sorted copies, the current algorithms
 *    sort the data in place.
 *
 * One concession to duplicate storage is that text and numeric versions of the same
 * field might be stored. The reason is because it's important to convert text to numbers
 * as they are read so that useful error messages can be generated. And, storing both
 * forms of the same field should be less common.
 *
 * The current implementation uses the same missing values policy for all fields. If
 * multiple policies become supported this will need to change.
 *
 * Built-in calculations - UniqueKeyValueLists have a built-in median operation. This is
 * to avoid repeated calculations of the median by different calculations.
 */

final class SharedFieldValues
{
    // Arrays with field indices that need to be saved.
    private size_t[] _numericFieldIndices;
    private size_t[] _textFieldIndices;

    /* Called during summarizer setup to add a shared field value for a specific field index.
     * eg. '--median 7' will add end up calling addNumericIdex(6), 6 being the zero-based index.
     * A specific index is only added once.
     */
    final void addNumericIndex (size_t index)
    {
        if (!canFind(_numericFieldIndices, index)) _numericFieldIndices ~= index;
    }

    /* Similar to addNumericIndex, except adds a text index. */
    final void addTextIndex (size_t index)
    {
        if (!canFind(_textFieldIndices, index)) _textFieldIndices ~= index;
    }

    /* Called every time a new key is found, or once at the beginning of the program if no keys
     * are being used (entire column summarized).
     */
    final UniqueKeyValuesLists makeUniqueKeyValuesLists()
    {
        return new UniqueKeyValuesLists(_numericFieldIndices, _textFieldIndices);
    }
}

final class UniqueKeyValuesLists
{
    /* A FieldValues object holds is a list of values collect for a specific field. A
     * unique key may hold several. For example, the command:
     *     $ tsv-summarize --k 1 --median 4 -- median 5
     * requires keeping lists for both fields 4 and 5. This in turn will result in a
     * _numericFieldValues being a 2 element array, one with a list of field 4 values,
     * the second of field 5 values. Linear search is used to find a specific field.
     */
    private FieldValues!double[] _numericFieldValues;
    private FieldValues!string[] _textFieldValues;
    private double[] _numericFieldMedians;

    /* The UniqueKeyValuesLists constructor takes arrays of field indices to be saved. */
    this(const size_t[] numericFieldIndices, const size_t[] textFieldIndices)
    {
        if (numericFieldIndices.length > 0)
        {
            _numericFieldValues = new FieldValues!double[](numericFieldIndices.length);
            foreach (i, fieldIndex; numericFieldIndices)
                _numericFieldValues[i] = new FieldValues!double(fieldIndex);
        }

        if (textFieldIndices.length > 0)
        {
            _textFieldValues = new FieldValues!string[](textFieldIndices.length);
            foreach (i, fieldIndex; textFieldIndices)
                _textFieldValues[i] = new FieldValues!string(fieldIndex);
        }
    }

    void processNextLine(const char[][] fields, MissingFieldPolicy missingPolicy)
    {
        _numericFieldValues.each!((ref x) => x.processNextLine(fields, missingPolicy));
        _textFieldValues.each!((ref x) => x.processNextLine(fields, missingPolicy));
    }

    private FieldValues!double findNumericFieldValues(size_t index)
    {
        alias pred = (FieldValues!double a, size_t b) => (a.fieldIndex == b);
        auto r = find!pred(_numericFieldValues, index);
        assert(!r.empty);
        return r.front;
    }

    private FieldValues!string findTextFieldValues(size_t index)
    {
        alias pred = (FieldValues!string a, size_t b) => (a.fieldIndex == b);
        auto r = find!pred(_textFieldValues, index);
        assert(!r.empty);
        return r.front;
    }

    final double[] numericValues(size_t index)
    {
        return findNumericFieldValues(index).getArray;
    }

    final double[] numericValuesSorted(size_t index)
    {
        return findNumericFieldValues(index).getSortedArray;
    }

    final string[] textValues(size_t index)
    {
        return findTextFieldValues(index).getArray;
    }

    final string[] textValuesSorted(size_t index)
    {
        return findTextFieldValues(index).getSortedArray;
    }

    final double numericValuesMedian(size_t index)
    {
        return findNumericFieldValues(index).median;
    }

    private final class FieldValues(ValueType)
    {
        import std.array : appender;
        private size_t _fieldIndex;
        private Appender!(ValueType[]) _values;
        private bool _haveMedian = false;
        private bool _isSorted = false;
        private ValueType _medianValue;

        this(size_t fieldIndex)
        {
            _fieldIndex = fieldIndex;
        }

        final size_t length() const @property
        {
            return _values.data.length;
        }

        final size_t fieldIndex() const @property
        {
            return _fieldIndex;
        }

        final void processNextLine(const char[][] fields, MissingFieldPolicy missingPolicy)
        {
            debug writefln("[%s]: %s", __FUNCTION__, fields.to!string);

            const char[] field = fields[_fieldIndex];
            if (missingPolicy.useMissing || !missingPolicy.isMissingField(field))
            {
                _values.put(field.to!ValueType);
                _haveMedian = false;
                _isSorted = false;
            }
            else if (missingPolicy.replaceMissing)
            {
                _values.put(missingPolicy.missingReplacement.to!ValueType);
                _haveMedian = false;
                _isSorted = false;
            }
        }

        /* Return an input range of the values. */
        final auto values()
        {
            return _values.data;
        }

        final ValueType[] getArray()
        {
            return _values.data;
        }

        final ValueType[] getSortedArray()
        {
            if (!_isSorted)
            {
                import std.algorithm : sort;
                sort(_values.data);
                _isSorted = true;
            }
            return _values.data;
        }

        final ValueType median()
        {
            if (!_haveMedian)
            {
                import tsv_utils.common.numerics : rangeMedian;
                _medianValue = _values.data.rangeMedian();
                _haveMedian = true;
            }

            return _medianValue;
        }
    }
}

/** SingleFieldOperator is a base class for single field operators, the most common
 * Operator. Derived classes implement makeCalculator and the Calculator class it returns.
 */
class SingleFieldOperator : Operator
{
    import std.typecons : Flag;

    private string _name;
    private string _header;
    private size_t _fieldIndex;
    private bool _useHeaderSuffix;
    private bool _allowCustomHeader;
    private bool _hasCustomHeader = false;
    private size_t[] _numericFieldsToSave;
    private size_t[] _textFieldsToSave;
    private MissingFieldPolicy _missingPolicy;

    this(string operatorName, size_t fieldIndex, MissingFieldPolicy missingPolicy,
         Flag!"useHeaderSuffix" useHeaderSuffix = Yes.useHeaderSuffix,
         Flag!"allowCustomHeader" allowCustomHeader = Yes.allowCustomHeader)
    {
        _name = operatorName;
        _fieldIndex = fieldIndex;
        _missingPolicy = missingPolicy;
        _useHeaderSuffix = useHeaderSuffix;
        _allowCustomHeader = allowCustomHeader;
        // Default header. May be overrridden by custom header or header line.
        _header =
            fieldHeaderFromIndex(fieldIndex)
            .summaryHeaderFromFieldHeader(_useHeaderSuffix ? operatorName : "");
    }

    void setCustomHeader (string customHeader)
    {
        assert(_allowCustomHeader);
        _header = customHeader;
        _hasCustomHeader = true;
    }

    final string name() const @property
    {
        return _name;
    }

    final bool allowCustomHeader() const @property
    {
        return _allowCustomHeader;
    }

    /* saveFieldValues[Numeric|Text] are called by derived classes to indicate that field
     * that the field values should be saved. These should called during construction.
     */
    final void setSaveFieldValuesNumeric()
    {
        _numericFieldsToSave ~= _fieldIndex;
    }

    final void setSaveFieldValuesText()
    {
        _textFieldsToSave ~= _fieldIndex;
    }

    final MissingFieldPolicy missingPolicy() @property
    {
        return _missingPolicy;
    }

    final size_t fieldIndex() const @property
    {
        return _fieldIndex;
    }

    final string header() const @property
    {
        return _header;
    }

    final bool useHeaderSuffix() const @property
    {
        return _useHeaderSuffix;
    }

    void processHeaderLine(const char[][] fields)
    {
        if (!_hasCustomHeader) {
            debug writefln("[%s %d] fields: %s", __FUNCTION__, _fieldIndex, fields.to!string);
            _header = summaryHeaderFromFieldHeader(fields[_fieldIndex].to!string,
                                                   _useHeaderSuffix ? _name : "");
        }
    }

    final size_t[] numericFieldsToSave()
    {
        return _numericFieldsToSave;
    }

    final size_t[] textFieldsToSave()
    {
        return _textFieldsToSave;
    }

    abstract SingleFieldCalculator makeCalculator();
}

/** SingleFieldCalculator is a base class for the common case of calculators using a single
 * field. Derived classes implement processNextField() rather than processNextLine().
 */
class SingleFieldCalculator : Calculator
{
    private size_t _fieldIndex;

    this(size_t fieldIndex)
    {
        _fieldIndex = fieldIndex;
    }

    final size_t fieldIndex() const @property
    {
        return _fieldIndex;
    }

    final void processNextLine(const char[][] fields)
    {
        debug writefln("[%s %d] fields: %s", __FUNCTION__, _fieldIndex, fields.to!string);

        auto missingPolicy = getOperator.missingPolicy;
        const char[] field = fields[_fieldIndex];

        if (missingPolicy.useMissing || !missingPolicy.isMissingField(field))
        {
            processNextField(field);
        }
        else if (missingPolicy.replaceMissing)
        {
            processNextField(missingPolicy.missingReplacement);
        }
    }

    abstract SingleFieldOperator getOperator();

    abstract void processNextField(const char[] field);
}

/* Unittest helper functions. Only compiled when -unittest is in effect. */
version(unittest)
{
    /** A helper for SingleFieldOperator unit tests.
     *
     * testSingleFieldOperator takes a set of split file values, a field index, a header
     * suffix, and a set of expected values. The expected values array contains the
     * initial value (zero entries) and the expected values after each line. (One more
     * expected value than input lines.) The zero entry case is what is generated for an
     * empty file. An example testing the 'min' operator against a file with 2 columns,
     * 3 rows, using field index 1:
     *
     *    testSingleFieldOperator!MinOperator(
     *       [["10", "100"],               // The split file. 3 lines by 2 rows.
     *        ["5", "50"],
     *        ["20", "200"]],
     *       1,                            // Field index (zero-based, so "100", "50", "200")
     *       "min",                        // The header suffix, normally the operator name.
     *       ["nan", "100", "50", "50"]);  // Min value after processing each line.
     *
     * A typical operator unit test uses three "files", one each of 1x3, 2x3, and 3x3.
     * Then run the operator is tested against each column, a total of six calls. Headers
     * are automatically checked. Additional entries can be used to extend coverage.
     *
     * A non-default MissingFieldPolicy can be provide as an optional last argument.
     * Operator tests should include exclusion and replacement variations. See operator
     * unit tests for details.
     *
     * The testSingleFieldOperatorBase adds an additional capability - Custom operator
     * init arguments. Currently this is used only by the quantile operator.
     *
     * These tests do not check unique key behavior (group-by). Operators don't have info
     * about unique keys, and interact with them only indirectly, via Calculators.
     */
    void testSingleFieldOperator(OperatorClass : SingleFieldOperator)
        (const char[][][] splitFile, size_t fieldIndex, string headerSuffix,
         const char[][] expectedValues,
         MissingFieldPolicy missingPolicy = new MissingFieldPolicy)
    {
        testSingleFieldOperatorBase!OperatorClass(splitFile, fieldIndex, headerSuffix, expectedValues, missingPolicy);
    }

    void testSingleFieldOperatorBase(OperatorClass : SingleFieldOperator, T...)
        (const char[][][] splitFile, size_t fieldIndex, string headerSuffix,
         const char[][] expectedValues,
         MissingFieldPolicy missingPolicy,
         T extraOpInitArgs)
    {
        import std.format : format;
        import std.array : appender;
        import std.string : chomp;
        import std.traits : EnumMembers;

        auto numFields = (splitFile[0]).length;

        assert(fieldIndex < numFields,
               format("[testSingleFieldOperator] Invalid field index. headerSuffix: %s",
                      headerSuffix));
        assert(splitFile.length + 1 == expectedValues.length,
               format("[testSingleFieldOperator] Need one more expected value than number of rows. headerSuffix: %s",
                      headerSuffix));

        /* printOptions - Only the 'values-delimiter' (2nd arg) is used these tests. */
        auto printOptions = SummarizerPrintOptions('#', '|');

        /* An input header line. */
        string[] inputHeaderLine = new string[numFields];
        foreach (i; 0 .. numFields) inputHeaderLine[i] = "header" ~ i.to!string;

        /* The different expected output field headers. */
        auto outputFieldHeaderWithNoHeaderLine =
            fieldHeaderFromIndex(fieldIndex)
            .summaryHeaderFromFieldHeader(headerSuffix);
        auto outputFieldHeaderFromHeaderLine =
            inputHeaderLine[fieldIndex]
            .summaryHeaderFromFieldHeader(headerSuffix);
        auto customOutputFieldHeader = "custom";

        enum HeaderUsecase {
            HeaderLine_DefaultHeader,
            HeaderLine_CustomHeader,
            NoHeaderLine_DefaultHeader,
            NoHeaderLine_CustomHeader,
            NoHeaderLine_NoOutputHeader,
        }

        string headerAssertMessage(Operator op, HeaderUsecase hc, const char[] actual, const char[] expected)
        {
            return format("[testSingleFieldOperator] Unexpected header. Operator: %s; Usecase: %s;  Actual: '%s';  Expected: '%s'",
                          op.name, hc, actual, expected);
        }

        string valueAssertMessage(Operator op, HeaderUsecase hc, size_t rowIndex, size_t fieldIndex,
                                  const char[] actual, const char[] expected)
        {
            return format("[testSingleFieldOperator] Unexpected value. Operator: %s; Usecase: %s;  RowIndex: %d, FieldIndex: %d\n    Actual: '%s';  Expected: '%s'",
                          op.name, hc, rowIndex, fieldIndex, actual, expected);
        }

        /* Run the logic for each header use case. */
        foreach (hc; EnumMembers!HeaderUsecase)
        {
            bool hasInputHeader = (
                hc == HeaderUsecase.HeaderLine_DefaultHeader ||
                hc == HeaderUsecase.HeaderLine_CustomHeader
                );
            bool hasOutputHeader = (
                hc == HeaderUsecase.HeaderLine_DefaultHeader ||
                hc == HeaderUsecase.HeaderLine_CustomHeader ||
                hc == HeaderUsecase.NoHeaderLine_DefaultHeader ||
                hc == HeaderUsecase.NoHeaderLine_CustomHeader
                );
            bool hasCustomHeader = (
                hc == HeaderUsecase.HeaderLine_CustomHeader ||
                hc == HeaderUsecase.NoHeaderLine_CustomHeader
                );

            if (hasCustomHeader) assert(hasOutputHeader);

            auto op = new OperatorClass(fieldIndex, missingPolicy, extraOpInitArgs);

            if (hasCustomHeader)
            {
                if (!op.allowCustomHeader) continue;   // Custom header not support by this operator
                op.setCustomHeader(customOutputFieldHeader);
            }

            Operator[] operatorArray;
            operatorArray ~= op;

            auto summarizer = new NoKeySummarizer!(typeof(appender!(char[])()))('#', missingPolicy);
            summarizer.setOperators(inputRangeObject(operatorArray));

            if (hasInputHeader) summarizer.processHeaderLine(inputHeaderLine);

            if (hasOutputHeader)
            {
                /* Write the header line. Note that this is a one-field header, */
                auto headerLineOutput = appender!(char[])();
                summarizer.writeSummaryHeader(headerLineOutput, printOptions);

                /* Test that the header was generated correctly.
                 *
                 * Note: Because the output is generated by a Summarizer, it will have a
                 * trailing newline. Use chomp to trim it.
                 */
                final switch (hc)
                {
                case HeaderUsecase.HeaderLine_DefaultHeader:
                    assert(headerLineOutput.data.chomp == outputFieldHeaderFromHeaderLine,
                           headerAssertMessage(operatorArray[0], hc, headerLineOutput.data.chomp,
                                               outputFieldHeaderFromHeaderLine));
                    break;
                case HeaderUsecase.NoHeaderLine_DefaultHeader:
                    assert(headerLineOutput.data.chomp == outputFieldHeaderWithNoHeaderLine,
                           headerAssertMessage(operatorArray[0], hc, headerLineOutput.data.chomp,
                                               outputFieldHeaderWithNoHeaderLine));
                    break;
                case HeaderUsecase.HeaderLine_CustomHeader:
                case HeaderUsecase.NoHeaderLine_CustomHeader:
                    assert(headerLineOutput.data.chomp == customOutputFieldHeader,
                           headerAssertMessage(operatorArray[0], hc, headerLineOutput.data.chomp,
                                               customOutputFieldHeader));
                    break;
                case HeaderUsecase.NoHeaderLine_NoOutputHeader:
                    break;
               }

            }

            /* For each line, process the line, generate the output, and test that the
             * value is correct. Start with the empty file case.
             */
            foreach (i, const char[] expected; expectedValues)
            {
                if (i > 0) summarizer.processNextLine(splitFile[i - 1]);
                auto summaryLineOutput = appender!(char[])();
                summarizer.writeSummaryBody(summaryLineOutput, printOptions);
                assert(summaryLineOutput.data.chomp == expected,
                       valueAssertMessage(operatorArray[0], hc, i, fieldIndex,
                                          summaryLineOutput.data.chomp, expectedValues[i]));
            }
        }
    }
}

/** ZeroFieldOperator is a base class for operators that take no input. The main use
 * case is the CountOperator, which counts the occurrences of each unique key. Other
 * uses are possible, for example, weighted random number assignment.
 *
 * The primary rationale for ZeroFieldOperator and ZeroFieldCalculator is to clarify
 * the information available to such a routine. In particular, the split fields passed
 * to processHeaderLine and processNextLine don't include all fields in the input,
 * something that might not be obvious when implementing an operator. (Only fields
 * required by operators acting on specific fields are included.)
 */
class ZeroFieldOperator : Operator
{
    import std.typecons : Flag;

    private string _name;
    private string _header;

    this(string operatorName)
    {
        _name = operatorName;
        _header = operatorName;
    }

    void setCustomHeader (string customHeader)
    {
        _header = customHeader;
    }

    bool allowCustomHeader() const @property
    {
        return true;
    }

    final string name() const @property
    {
        return _name;
    }

    final string header() const @property
    {
        return _header;
    }

    /* A no-op. ZeroFieldOperators have no access to the header line. */
    final void processHeaderLine(const char[][] fields) { }

    /* A no-op. ZeroFieldOperators have no access to fields. */
    final size_t[] numericFieldsToSave()
    {
        size_t[] emptyArray;
        return emptyArray;
    }

    /* A no-op. ZeroFieldOperators have no access to fields. */
    final size_t[] textFieldsToSave()
    {
        size_t[] emptyArray;
        return emptyArray;
    }

    abstract ZeroFieldCalculator makeCalculator();
}

/** ZeroFieldCalculator is a base class for operators that don't use fields as input.
 * In particular, the Count operator. It is a companion to the ZeroFieldOperator class.
 *
 * Derived classes implement processNextEntry() rather than processNextLine(), and the
 * single argument form of calculate() given as an abstract function.
 */
class ZeroFieldCalculator : Calculator
{
    this() { }

    final void processNextLine(const char[][] fields)
    {
        debug writefln("[%s]", __FUNCTION__,);
        processNextEntry();
    }

    final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
    {
        return calculate(printOptions);
    }

    abstract void processNextEntry();
    abstract string calculate(const ref SummarizerPrintOptions printOptions);
}

version(unittest)
{
    /* A helper for ZeroFieldOperator unit tests.
     *
     * testZeroFieldOperator takes a set of split file values, a default header, and a
     * set of expected values. The expected values array contains the expected values
     * after each line.
     *
     * testZeroFieldOperator is very similar to testSingleFieldOperator, except that
     * there is no use of field indices and fewer types of headers. See the latter's
     * documentation and the CountOperator unit tests for examples.
     */
    void testZeroFieldOperator(OperatorClass : ZeroFieldOperator)
        (const char[][][] splitFile, string defaultHeader, const char[][] expectedValues)
    {
        import std.format : format;
        import std.array : appender;
        import std.string : chomp;
        import std.traits : EnumMembers;

        auto numFields = (splitFile[0]).length;

        assert(splitFile.length + 1 == expectedValues.length,
               format("[testZeroFieldOperator] Need one more expected value than number of rows. headerSuffix: %s",
                      defaultHeader));

        /* printOptions - Not used these tests, but needed for API calls. */
        auto printOptions = SummarizerPrintOptions('#', '|');

        /* Missing policy doesn't apply to zero field operators, but need the object for the summarizer. */
        auto missingPolicy = new MissingFieldPolicy;

        /* An input header line. */
        string[] inputHeaderLine = new string[numFields];
        foreach (i; 0 .. numFields) inputHeaderLine[i] = "header" ~ i.to!string;

        auto customOutputFieldHeader = "custom";

        enum HeaderUsecase {
            HeaderLine_DefaultHeader,
            HeaderLine_CustomHeader,
            NoHeaderLine_DefaultHeader,
            NoHeaderLine_CustomHeader,
            NoHeaderLine_NoOutputHeader,
        }

        string headerAssertMessage(Operator op, HeaderUsecase hc, const char[] actual, const char[] expected)
        {
            return format("[testZeroFieldOperator] Unexpected header. Operator: %s; Usecase: %s;  Actual: '%s';  Expected: '%s'",
                          op.name, hc, actual, expected);
        }

        string valueAssertMessage(Operator op, HeaderUsecase hc, size_t rowIndex,
                                  const char[] actual, const char[] expected)
        {
            return format("[testZeroFieldOperator] Unexpected value. Operator: %s; Usecase: %s;  RowIndex: %d\n    Actual: '%s';  Expected: '%s'",
                          op.name, hc, rowIndex, actual, expected);
        }

        /* Run the logic for each header use case. */
        foreach (hc; EnumMembers!HeaderUsecase)
        {
            bool hasInputHeader = (
                hc == HeaderUsecase.HeaderLine_DefaultHeader ||
                hc == HeaderUsecase.HeaderLine_CustomHeader
                );
            bool hasOutputHeader = (
                hc == HeaderUsecase.HeaderLine_DefaultHeader ||
                hc == HeaderUsecase.HeaderLine_CustomHeader ||
                hc == HeaderUsecase.NoHeaderLine_DefaultHeader ||
                hc == HeaderUsecase.NoHeaderLine_CustomHeader
                );
            bool hasCustomHeader = (
                hc == HeaderUsecase.HeaderLine_CustomHeader ||
                hc == HeaderUsecase.NoHeaderLine_CustomHeader
                );

            if (hasCustomHeader) assert(hasOutputHeader);

            auto op = new OperatorClass();

            if (hasCustomHeader)
            {
                if (!op.allowCustomHeader) continue;   // Custom header not support by this operator
                op.setCustomHeader(customOutputFieldHeader);
            }

            Operator[] operatorArray;
            operatorArray ~= op;

            auto summarizer = new NoKeySummarizer!(typeof(appender!(char[])()))('#', missingPolicy);
            summarizer.setOperators(inputRangeObject(operatorArray));
            if (hasInputHeader) summarizer.processHeaderLine(inputHeaderLine);

            if (hasOutputHeader)
            {
                /* Write the header line. Note that this is a one-field header, */
                auto headerLineOutput = appender!(char[])();
                summarizer.writeSummaryHeader(headerLineOutput, printOptions);

                /* Test that the header was generated correctly.
                 *
                 * Note: Because the output is generated by a Summarizer, it will have a
                 * trailing newline. Use chomp to trim it.
                 */
                final switch (hc)
                {
                case HeaderUsecase.HeaderLine_DefaultHeader:
                case HeaderUsecase.NoHeaderLine_DefaultHeader:
                    assert(headerLineOutput.data.chomp == defaultHeader,
                           headerAssertMessage(operatorArray[0], hc, headerLineOutput.data.chomp,
                                               defaultHeader));
                    break;
                case HeaderUsecase.HeaderLine_CustomHeader:
                case HeaderUsecase.NoHeaderLine_CustomHeader:
                    assert(headerLineOutput.data.chomp == customOutputFieldHeader,
                           headerAssertMessage(operatorArray[0], hc, headerLineOutput.data.chomp,
                                               customOutputFieldHeader));
                    break;
                case HeaderUsecase.NoHeaderLine_NoOutputHeader:
                    break;
                }

            }

            /* For each line, process the line, generate the output, and test that the
             * value is correct. Start with the empty file case.
             */
            foreach (i, const char[] expected; expectedValues)
            {
                if (i > 0) summarizer.processNextLine(splitFile[i - 1]);
                auto summaryLineOutput = appender!(char[])();
                summarizer.writeSummaryBody(summaryLineOutput, printOptions);
                assert(summaryLineOutput.data.chomp == expected,
                       valueAssertMessage(operatorArray[0], hc, i,
                                          summaryLineOutput.data.chomp, expectedValues[i]));
            }
        }
    }
}

/* Specific operators.
 *
 * Notes:
 * - The 'Calculator' inner classes are 'static'. This means inner class instances do not
 *   keep a reference to the context of the outer class. In exchange, Calculator instances
 *   need to hold all needed state, typically the field index they are summarizing.
 */

/** CountOperator counts the number of occurrences of each unique key, or the number of
 * input lines if there is no unique key.
 *
 * CountOperator differs from most other operators in that it doesn't summarize a specific
 * field on the line. Instead it is summarizing a property of the unique key itself. For
 * this reason it doesn't derive from SingleFieldOperator.
 */
final class CountOperator : ZeroFieldOperator
{
    this()
    {
        super("count");
    }

    final override ZeroFieldCalculator makeCalculator()
    {
        return new CountCalculator();
    }

    static final class CountCalculator : ZeroFieldCalculator
    {
        private size_t _count = 0;

        final override void processNextEntry()
        {
            _count++;
        }

        final override string calculate(const ref SummarizerPrintOptions printOptions)
        {
            return printOptions.formatNumber(_count);
        }
    }
}

unittest // CountOperator
{
    auto col1File = [["10"], ["9.5"], ["11"]];
    auto col2File = [["20", "-30"], ["21", "-29"], ["22", "-31"]];
    auto col3File = [["9009", "9", "-4.5"], ["199", "0", "-0.5"], ["3003", "0.2", "12"]];

    testZeroFieldOperator!CountOperator(col1File, "count", ["0", "1", "2", "3"]);
    testZeroFieldOperator!CountOperator(col2File, "count", ["0", "1", "2", "3"]);
    testZeroFieldOperator!CountOperator(col3File, "count", ["0", "1", "2", "3"]);
}

/** RetainOperator retains the first occurrence of a field, without changing the header.
 *
 * RetainOperator is intended for fields where the value is expected to be the same for
 * all occurrences of the unique key, and the goal is to pass the value through unchanged.
 * It is like FirstOperator, except that the original header is preserved. The original
 * header preservation is setup in the call to the SingleFieldOperation constructor.
 *
 * Notes:
 * - An option to signal an error if multiple values are encountered might be useful.
 */
final class RetainOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("retain", fieldIndex, missingPolicy, No.useHeaderSuffix, No.allowCustomHeader);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new RetainCalculator(fieldIndex);
    }

    final class RetainCalculator : SingleFieldCalculator
    {
        private bool _done = false;
        private string _value = "";

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override RetainOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            if (!_done)
            {
                _value = nextField.to!string;
                _done = true;
            }
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
       {
            return _value;
        }
    }
}

unittest // RetainOperator
{
    auto col1File = [["r1c1"], ["r2c1"], ["r3c1"]];
    auto col2File = [["r1c1", "r1c2"], ["r2c1", "r2c2"], ["r3c1", "r3c2"]];
    auto col3File = [["r1c1", "r1c2", "r1c3"], ["r2c1", "r2c2", "r2c3"], ["r3c1", "r3c2", "r3c3"]];

    testSingleFieldOperator!RetainOperator(col1File, 0, "", ["", "r1c1", "r1c1", "r1c1"]);
    testSingleFieldOperator!RetainOperator(col2File, 0, "", ["", "r1c1", "r1c1", "r1c1"]);
    testSingleFieldOperator!RetainOperator(col2File, 1, "", ["", "r1c2", "r1c2", "r1c2"]);
    testSingleFieldOperator!RetainOperator(col3File, 0, "", ["", "r1c1", "r1c1", "r1c1"]);
    testSingleFieldOperator!RetainOperator(col3File, 1, "", ["", "r1c2", "r1c2", "r1c2"]);
    testSingleFieldOperator!RetainOperator(col3File, 2, "", ["", "r1c3", "r1c3", "r1c3"]);

    auto col1misFile = [[""], ["r2c1"], ["r3c1"]];
    testSingleFieldOperator!RetainOperator(col1misFile, 0, "", ["", "", "r2c1", "r2c1"],
                                           new MissingFieldPolicy(true, ""));  // Exclude missing
    testSingleFieldOperator!RetainOperator(col1misFile, 0, "", ["", "NA", "NA", "NA"],
                                           new MissingFieldPolicy(false, "NA"));  // Replace missing
}

/** FirstOperator outputs the first value found for the field.
 */
final class FirstOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("first", fieldIndex, missingPolicy);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new FirstCalculator(fieldIndex);
    }

    final class FirstCalculator : SingleFieldCalculator
    {
        private bool _done = false;
        private string _value = "";

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override FirstOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            if (!_done)
            {
                _value = nextField.to!string;
                _done = true;
            }
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return _value;
        }
    }
}

unittest // FirstOperator
{
    auto col1File = [["r1c1"], ["r2c1"], ["r3c1"]];
    auto col2File = [["r1c1", "r1c2"], ["r2c1", "r2c2"], ["r3c1", "r3c2"]];
    auto col3File = [["r1c1", "r1c2", "r1c3"], ["r2c1", "r2c2", "r2c3"], ["r3c1", "r3c2", "r3c3"]];

    testSingleFieldOperator!FirstOperator(col1File, 0, "first", ["", "r1c1", "r1c1", "r1c1"]);
    testSingleFieldOperator!FirstOperator(col2File, 0, "first", ["", "r1c1", "r1c1", "r1c1"]);
    testSingleFieldOperator!FirstOperator(col2File, 1, "first", ["", "r1c2", "r1c2", "r1c2"]);
    testSingleFieldOperator!FirstOperator(col3File, 0, "first", ["", "r1c1", "r1c1", "r1c1"]);
    testSingleFieldOperator!FirstOperator(col3File, 1, "first", ["", "r1c2", "r1c2", "r1c2"]);
    testSingleFieldOperator!FirstOperator(col3File, 2, "first", ["", "r1c3", "r1c3", "r1c3"]);

    auto col1misFile = [[""], ["r2c1"], ["r3c1"]];
    testSingleFieldOperator!FirstOperator(col1misFile, 0, "first", ["", "", "r2c1", "r2c1"],
                                          new MissingFieldPolicy(true, ""));  // Exclude missing
    testSingleFieldOperator!FirstOperator(col1misFile, 0, "first", ["", "NA", "NA", "NA"],
                                          new MissingFieldPolicy(false, "NA"));  // Replace missing
}

/** LastOperator outputs the last value found for the field.
 */
final class LastOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("last", fieldIndex, missingPolicy);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new LastCalculator(fieldIndex);
    }

    final class LastCalculator : SingleFieldCalculator
    {
        private string _value = "";

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override LastOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            _value = nextField.to!string;
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return _value;
        }
    }
}

unittest // LastOperator
{
    auto col1File = [["r1c1"], ["r2c1"], ["r3c1"]];
    auto col2File = [["r1c1", "r1c2"], ["r2c1", "r2c2"], ["r3c1", "r3c2"]];
    auto col3File = [["r1c1", "r1c2", "r1c3"], ["r2c1", "r2c2", "r2c3"], ["r3c1", "r3c2", "r3c3"]];

    testSingleFieldOperator!LastOperator(col1File, 0, "last", ["", "r1c1", "r2c1", "r3c1"]);
    testSingleFieldOperator!LastOperator(col2File, 0, "last", ["", "r1c1", "r2c1", "r3c1"]);
    testSingleFieldOperator!LastOperator(col2File, 1, "last", ["", "r1c2", "r2c2", "r3c2"]);
    testSingleFieldOperator!LastOperator(col3File, 0, "last", ["", "r1c1", "r2c1", "r3c1"]);
    testSingleFieldOperator!LastOperator(col3File, 1, "last", ["", "r1c2", "r2c2", "r3c2"]);
    testSingleFieldOperator!LastOperator(col3File, 2, "last", ["", "r1c3", "r2c3", "r3c3"]);

    auto col1misFile = [[""], ["r2c1"], ["r3c1"]];
    testSingleFieldOperator!LastOperator(col1misFile, 0, "last", ["", "", "r2c1", "r3c1"],
                                          new MissingFieldPolicy(true, ""));  // Exclude missing
    testSingleFieldOperator!LastOperator(col1misFile, 0, "last", ["", "NA", "r2c1", "r3c1"],
                                          new MissingFieldPolicy(false, "NA"));  // Replace missing
}

/** MinOperator output the minimum value for the field. This is a numeric operator.
 *
 * This operator returns the original string without additional numeric formatting.
 * This can be useful when joining back to the original data. This is different than
 * numeric operators that perform calculations.
 */
final class MinOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("min", fieldIndex, missingPolicy);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new MinCalculator(fieldIndex);
    }

    final class MinCalculator : SingleFieldCalculator
    {
        private bool _isFirst = true;
        private double _value = double.nan;
        private string _originalString = "nan";  // Note: Cannot format floats at compile time (version 2.087)

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override MinOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            double fieldValue = nextField.to!double;
            if (_isFirst)
            {
                _value = fieldValue;
                _originalString = nextField.to!string;
                _isFirst = false;
            }
            else if (fieldValue < _value)
            {
                _value = fieldValue;
                _originalString = nextField.to!string;
            }
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return _originalString;
        }
    }
}

unittest // MinOperator
{
    auto col1File = [["10"], ["9.5"], ["11"]];
    auto col2File = [["20", "-30"], ["21", "-29"], ["22", "-31"]];
    auto col3File = [["9009", "9", "-4.5"], ["199", "0", "-0.5"], ["3003", "0.2", "12"]];

    testSingleFieldOperator!MinOperator(col1File, 0, "min", ["nan", "10", "9.5", "9.5"]);
    testSingleFieldOperator!MinOperator(col2File, 0, "min", ["nan", "20", "20", "20"]);
    testSingleFieldOperator!MinOperator(col2File, 1, "min", ["nan", "-30", "-30", "-31"]);
    testSingleFieldOperator!MinOperator(col3File, 0, "min", ["nan", "9009", "199", "199"]);
    testSingleFieldOperator!MinOperator(col3File, 1, "min", ["nan", "9", "0", "0"]);
    testSingleFieldOperator!MinOperator(col3File, 2, "min", ["nan", "-4.5", "-4.5", "-4.5"]);

    auto col1misFile = [[""], ["10"], ["-10"]];
    testSingleFieldOperator!MinOperator(col1misFile, 0, "min", ["nan", "nan", "10", "-10"],
                                          new MissingFieldPolicy(true, ""));  // Exclude missing
    testSingleFieldOperator!MinOperator(col1misFile, 0, "min", ["nan", "5", "5", "-10"],
                                          new MissingFieldPolicy(false, "5"));  // Replace missing
}

/** MaxOperator output the maximum value for the field. This is a numeric operator.
 *
 * This operator returns the original string without additional numeric formatting.
 * This can be useful when joining back to the original data. This is different than
 * numeric operators that perform calculations.
 */
final class MaxOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("max", fieldIndex, missingPolicy);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new MaxCalculator(fieldIndex);
    }

    final class MaxCalculator : SingleFieldCalculator
    {
        private bool _isFirst = true;
        private double _value = double.nan;
        private string _originalString = "nan";  // Note: Cannot format floats at compile time (version 2.087)

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override MaxOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            double fieldValue = nextField.to!double;
            if (_isFirst)
            {
                _value = fieldValue;
                _originalString = nextField.to!string;
                _isFirst = false;
            }
            else if (fieldValue > _value)
            {
                _value = fieldValue;
                _originalString = nextField.to!string;
            }
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return _originalString;
        }
    }
}

unittest // MaxOperator
{
    auto col1File = [["10"], ["9.5"], ["11"]];
    auto col2File = [["20", "-30"], ["21", "-29"], ["22", "-31"]];
    auto col3File = [["9009", "9", "-4.5"], ["199", "0", "-0.5"], ["3003", "0.2", "12"]];

    testSingleFieldOperator!MaxOperator(col1File, 0, "max", ["nan", "10", "10", "11"]);
    testSingleFieldOperator!MaxOperator(col2File, 0, "max", ["nan", "20", "21", "22"]);
    testSingleFieldOperator!MaxOperator(col2File, 1, "max", ["nan", "-30", "-29", "-29"]);
    testSingleFieldOperator!MaxOperator(col3File, 0, "max", ["nan", "9009", "9009", "9009"]);
    testSingleFieldOperator!MaxOperator(col3File, 1, "max", ["nan", "9", "9", "9"]);
    testSingleFieldOperator!MaxOperator(col3File, 2, "max", ["nan", "-4.5", "-0.5", "12"]);

    auto col1misFile = [[""], ["-10"], ["10"]];
    testSingleFieldOperator!MaxOperator(col1misFile, 0, "max", ["nan", "nan", "-10", "10"],
                                          new MissingFieldPolicy(true, ""));  // Exclude missing
    testSingleFieldOperator!MaxOperator(col1misFile, 0, "max", ["nan", "5", "5", "10"],
                                          new MissingFieldPolicy(false, "5"));  // Replace missing
}

/** RangeOperator outputs the difference between the minimum and maximum values.
 *
 * If there is a single value, or all values are the same, the range is zero. This is
 * a numeric operator.
 */
final class RangeOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("range", fieldIndex, missingPolicy);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new RangeCalculator(fieldIndex);
    }

    final class RangeCalculator : SingleFieldCalculator
    {
        private bool _isFirst = true;
        private double _minValue = 0.0;
        private double _maxValue = 0.0;

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override RangeOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            double fieldValue = nextField.to!double;
            if (_isFirst)
            {
                _minValue = _maxValue = fieldValue;
                _isFirst = false;
            }
            else if (fieldValue > _maxValue)
            {
                _maxValue = fieldValue;
            }
            else if (fieldValue < _minValue)
            {
                _minValue = fieldValue;
            }
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return printOptions.formatNumber(_maxValue - _minValue);
        }
    }
}

unittest // RangeOperator
{
    auto col1File = [["10"], ["9.5"], ["11"]];
    auto col2File = [["20", "-30"], ["21", "-29"], ["22", "-31"]];
    auto col3File = [["9009", "9", "-4.5"], ["199", "0", "-0.5"], ["3003", "0.2", "12"]];

    testSingleFieldOperator!RangeOperator(col1File, 0, "range", ["0", "0", "0.5", "1.5"]);
    testSingleFieldOperator!RangeOperator(col2File, 0, "range", ["0", "0", "1", "2"]);
    testSingleFieldOperator!RangeOperator(col2File, 1, "range", ["0", "0", "1", "2"]);
    testSingleFieldOperator!RangeOperator(col3File, 0, "range", ["0", "0", "8810", "8810"]);
    testSingleFieldOperator!RangeOperator(col3File, 1, "range", ["0", "0", "9", "9"]);
    testSingleFieldOperator!RangeOperator(col3File, 2, "range", ["0", "0", "4", "16.5"]);

    auto col1misFile = [[""], ["10"], [""], ["9.5"], ["11"]];
    testSingleFieldOperator!RangeOperator(col1misFile, 0, "range", ["0", "0", "0", "0", "0.5", "1.5"],
                                          new MissingFieldPolicy(true, ""));  // Exclude missing
    testSingleFieldOperator!RangeOperator(col1misFile, 0, "range", ["0", "0", "4.5", "4.5", "4.5", "5.5"],
                                          new MissingFieldPolicy(false, "5.5"));  // Replace missing
}

/** SumOperator produces the sum of all the values. This is a numeric operator.
 */
final class SumOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("sum", fieldIndex, missingPolicy);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new SumCalculator(fieldIndex);
    }

    final class SumCalculator : SingleFieldCalculator
    {
        private double _total = 0.0;

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override SumOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            _total += nextField.to!double;
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return printOptions.formatNumber(_total);
        }
    }
}

unittest // SumOperator
{
    auto col1File = [["10"], ["9.5"], ["11"]];
    auto col2File = [["20", "-30"], ["21", "-29"], ["22", "-31"]];
    auto col3File = [["9009", "9", "-4.5"], ["199", "0", "-0.5"], ["3003", "0.2", "12"]];

    testSingleFieldOperator!SumOperator(col1File, 0, "sum", ["0", "10", "19.5", "30.5"]);
    testSingleFieldOperator!SumOperator(col2File, 0, "sum", ["0", "20", "41", "63"]);
    testSingleFieldOperator!SumOperator(col2File, 1, "sum", ["0", "-30", "-59", "-90"]);
    testSingleFieldOperator!SumOperator(col3File, 0, "sum", ["0", "9009", "9208", "12211"]);
    testSingleFieldOperator!SumOperator(col3File, 1, "sum", ["0", "9", "9", "9.2"]);
    testSingleFieldOperator!SumOperator(col3File, 2, "sum", ["0", "-4.5", "-5", "7"]);

    auto col1misFile = [[""], ["10"], [""], ["9.5"], ["11"]];
    testSingleFieldOperator!SumOperator(col1misFile, 0, "sum", ["0", "0", "10", "10", "19.5", "30.5"],
                                          new MissingFieldPolicy(true, ""));  // Exclude missing
    testSingleFieldOperator!SumOperator(col1misFile, 0, "sum", ["0", "1.5", "11.5", "13", "22.5", "33.5"],
                                          new MissingFieldPolicy(false, "1.5"));  // Replace missing
}

/** MeanOperator produces the mean (average) of all the values. This is a numeric operator.
 */
final class MeanOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("mean", fieldIndex, missingPolicy);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new MeanCalculator(fieldIndex);
    }

    final class MeanCalculator : SingleFieldCalculator
    {
        private double _total = 0.0;
        private size_t _count = 0;

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override MeanOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            _total += nextField.to!double;
            _count++;
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return printOptions.formatNumber(
                (_count > 0) ? (_total / _count.to!double) : double.nan);
        }
    }
}

unittest // MeanOperator
{
    auto col1File = [["10"], ["9.5"], ["7.5"]];
    auto col2File = [["20", "-30"], ["21", "-29"], ["22", "-31"]];
    auto col3File = [["9009", "9", "-4.5"], ["9", "0", "-1.5"], ["4509", "-3", "12"]];

    testSingleFieldOperator!MeanOperator(col1File, 0, "mean", ["nan", "10", "9.75", "9"]);
    testSingleFieldOperator!MeanOperator(col2File, 0, "mean", ["nan", "20", "20.5", "21"]);
    testSingleFieldOperator!MeanOperator(col2File, 1, "mean", ["nan", "-30", "-29.5", "-30"]);
    testSingleFieldOperator!MeanOperator(col3File, 0, "mean", ["nan", "9009", "4509", "4509"]);
    testSingleFieldOperator!MeanOperator(col3File, 1, "mean", ["nan", "9", "4.5", "2"]);
    testSingleFieldOperator!MeanOperator(col3File, 2, "mean", ["nan", "-4.5", "-3", "2"]);

    auto col1misFile = [[""], ["6"], [""], ["14"], ["40"]];
    testSingleFieldOperator!MeanOperator(col1misFile, 0, "mean", ["nan", "nan", "6", "6", "10", "20"],
                                          new MissingFieldPolicy(true, ""));  // Exclude missing
    testSingleFieldOperator!MeanOperator(col1misFile, 0, "mean", ["nan", "0", "3", "2", "5", "12"],
                                          new MissingFieldPolicy(false, "0"));  // Replace missing
}

/** MedianOperator produces the median of all the values. This is a numeric operator.
 *
 * All the field values are stored in memory as part of this calculation. This is
 * handled by unique key value lists.
 */
final class MedianOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("median", fieldIndex, missingPolicy);
        setSaveFieldValuesNumeric();
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new MedianCalculator(fieldIndex);
    }

    final class MedianCalculator : SingleFieldCalculator
    {
        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override MedianOperator getOperator()
        {
            return this.outer;
        }

        /* Work is done by saving the field values. */
        final override void processNextField(const char[] nextField)
        { }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return printOptions.formatNumber(valuesLists.numericValuesMedian(fieldIndex));
        }
    }
}

unittest // MedianOperator
{
    auto col1File = [["10"], ["9.5"], ["7.5"]];
    auto col2File = [["20", "-30"], ["21", "-29"], ["22", "-31"]];
    auto col3File = [["9009", "9", "-4.5"], ["9", "0", "-1.5"], ["4509", "-3", "12"]];

    testSingleFieldOperator!MedianOperator(col1File, 0, "median", ["nan", "10", "9.75", "9.5"]);
    testSingleFieldOperator!MedianOperator(col2File, 0, "median", ["nan", "20", "20.5", "21"]);
    testSingleFieldOperator!MedianOperator(col2File, 1, "median", ["nan", "-30", "-29.5", "-30"]);
    testSingleFieldOperator!MedianOperator(col3File, 0, "median", ["nan", "9009", "4509", "4509"]);
    testSingleFieldOperator!MedianOperator(col3File, 1, "median", ["nan", "9", "4.5", "0"]);
    testSingleFieldOperator!MedianOperator(col3File, 2, "median", ["nan", "-4.5", "-3", "-1.5"]);

    auto col1misFile = [[""], ["10"], [""], ["9.5"], ["7.5"]];
    testSingleFieldOperator!MedianOperator(col1misFile, 0, "median", ["nan", "nan", "10", "10", "9.75", "9.5"],
                                          new MissingFieldPolicy(true, ""));  // Exclude missing
    testSingleFieldOperator!MedianOperator(col1misFile, 0, "median", ["nan", "0", "5", "0", "4.75", "7.5"],
                                          new MissingFieldPolicy(false, "0"));  // Replace missing
}

/** QuantileOperator produces the value representing the data at a cummulative probability.
 * This is a numeric operation.
 *
 * As an example, quantiles might be produced for the 0.25, 0.5, and 0.75 probabilities
 * (alternately, the 25th, 50th, and 75th percentile ranks, the 50th percentile being the
 * median). Data is sorted is ascending order. This operator takes one percentile, but it
 * is common to generate multiple quantile ranks for the same field when summarizing.
 *
 * All the field's values are stored in memory as part of this calculation. This is
 * handled by unique key value lists.
 */
final class QuantileOperator : SingleFieldOperator
{
    private double _prob;

    this(size_t fieldIndex, MissingFieldPolicy missingPolicy, double probability)
    {
        assert(0.0 <= probability && probability <= 1.0);
        import std.format : format;

        string header = (probability == 0.0) ? "pct0" : format("pct%02g", probability * 100.0);
        super(header, fieldIndex, missingPolicy);
        _prob = probability;
        setSaveFieldValuesNumeric();
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new QuantileCalculator(fieldIndex);
    }

    final class QuantileCalculator : SingleFieldCalculator
    {
        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override QuantileOperator getOperator()
        {
            return this.outer;
        }

        /* Work is done by saving the field values. */
        final override void processNextField(const char[] nextField)
        { }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            import tsv_utils.common.numerics : quantile;
            return printOptions.formatNumber(
                quantile(this.outer._prob, valuesLists.numericValuesSorted(fieldIndex)));
        }
    }
}

unittest // QuantileOperator
{
    auto col1File = [["10"], ["9.5"], ["7.5"]];
    auto col2File = [["20", "-30"], ["21", "-29"], ["22", "-31"]];
    auto col3File = [["9009", "9", "-4.5"], ["9", "0", "-1.5"], ["4509", "-3", "12"]];

    auto defaultMissing = new MissingFieldPolicy;

    /* Same as the median tests. */
    testSingleFieldOperatorBase!QuantileOperator(col1File, 0, "pct50", ["nan", "10", "9.75", "9.5"], defaultMissing, 0.50);
    testSingleFieldOperatorBase!QuantileOperator(col2File, 0, "pct50", ["nan", "20", "20.5", "21"], defaultMissing, 0.50);
    testSingleFieldOperatorBase!QuantileOperator(col2File, 1, "pct50", ["nan", "-30", "-29.5", "-30"], defaultMissing, 0.50);
    testSingleFieldOperatorBase!QuantileOperator(col3File, 0, "pct50", ["nan", "9009", "4509", "4509"], defaultMissing, 0.50);
    testSingleFieldOperatorBase!QuantileOperator(col3File, 1, "pct50", ["nan", "9", "4.5", "0"], defaultMissing, 0.50);
    testSingleFieldOperatorBase!QuantileOperator(col3File, 2, "pct50", ["nan", "-4.5", "-3", "-1.5"], defaultMissing, 0.50);

    /* The extremes (0, 1), are min and max. */
    testSingleFieldOperatorBase!QuantileOperator(col1File, 0, "pct0", ["nan", "10", "9.5", "7.5"], defaultMissing, 0.0);
    testSingleFieldOperatorBase!QuantileOperator(col2File, 0, "pct0", ["nan", "20", "20", "20"], defaultMissing, 0.0);
    testSingleFieldOperatorBase!QuantileOperator(col2File, 1, "pct0", ["nan", "-30", "-30", "-31"], defaultMissing, 0.0);
    testSingleFieldOperatorBase!QuantileOperator(col3File, 0, "pct0", ["nan", "9009", "9", "9"], defaultMissing, 0.0);
    testSingleFieldOperatorBase!QuantileOperator(col3File, 1, "pct0", ["nan", "9", "0", "-3"], defaultMissing, 0.0);
    testSingleFieldOperatorBase!QuantileOperator(col3File, 2, "pct0", ["nan", "-4.5", "-4.5", "-4.5"], defaultMissing, 0.0);

    testSingleFieldOperatorBase!QuantileOperator(col1File, 0, "pct100", ["nan", "10", "10", "10"], defaultMissing, 1.0);
    testSingleFieldOperatorBase!QuantileOperator(col2File, 0, "pct100", ["nan", "20", "21", "22"], defaultMissing, 1.0);
    testSingleFieldOperatorBase!QuantileOperator(col2File, 1, "pct100", ["nan", "-30", "-29", "-29"], defaultMissing, 1.0);
    testSingleFieldOperatorBase!QuantileOperator(col3File, 0, "pct100", ["nan", "9009", "9009", "9009"], defaultMissing, 1.0);
    testSingleFieldOperatorBase!QuantileOperator(col3File, 1, "pct100", ["nan", "9", "9", "9"], defaultMissing, 1.0);
    testSingleFieldOperatorBase!QuantileOperator(col3File, 2, "pct100", ["nan", "-4.5", "-1.5", "12"], defaultMissing, 1.0);

    /* For missing policies, re-use the median tests. */
    auto col1misFile = [[""], ["10"], [""], ["9.5"], ["7.5"]];
    testSingleFieldOperatorBase!QuantileOperator(col1misFile, 0, "pct50", ["nan", "nan", "10", "10", "9.75", "9.5"],
                                                 new MissingFieldPolicy(true, ""), 0.5);  // Exclude missing
    testSingleFieldOperatorBase!QuantileOperator(col1misFile, 0, "pct50", ["nan", "0", "5", "0", "4.75", "7.5"],
                                                 new MissingFieldPolicy(false, "0"), 0.5);  // Replace missing
}

/** MadOperator produces the median absolute deviation from the median. This is a numeric
 * operation.
 *
 * The result is the raw MAD value, without a normalization applied.
 *
 * All the field values are stored in memory as part of this calculation. This is
 * handled by unique key value lists.
 */
final class MadOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("mad", fieldIndex, missingPolicy);
        setSaveFieldValuesNumeric();
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new MadCalculator(fieldIndex);
    }

    final class MadCalculator : SingleFieldCalculator
    {
        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override MadOperator getOperator()
        {
            return this.outer;
        }

        /* Work is done by saving the field values. */
        final override void processNextField(const char[] nextField)
        { }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            import std.math : abs;
            import tsv_utils.common.numerics : rangeMedian;

            auto median = valuesLists.numericValuesMedian(fieldIndex);
            auto values = valuesLists.numericValues(fieldIndex);
            auto medianDevs = new double[values.length];
            foreach (size_t i, double v; values)
                medianDevs[i] = abs(v - median);

            return printOptions.formatNumber(medianDevs.rangeMedian);
        }
    }
}

unittest // MadOperator
{
    auto col1File = [["10"], ["15"], ["20"], ["25"], ["30"]];
    auto col2File = [["2", "50"], ["2", "51"], ["2", "52"]];
    auto col3File = [["16", "8", "-4"], ["8", "8", "-2"], ["8", "16", "0"]];

    testSingleFieldOperator!MadOperator(col1File, 0, "mad", ["nan", "0", "2.5", "5", "5", "5"]);
    testSingleFieldOperator!MadOperator(col2File, 0, "mad", ["nan", "0", "0", "0"]);
    testSingleFieldOperator!MadOperator(col2File, 1, "mad", ["nan", "0", "0.5", "1"]);
    testSingleFieldOperator!MadOperator(col3File, 0, "mad", ["nan", "0", "4", "0"]);
    testSingleFieldOperator!MadOperator(col3File, 1, "mad", ["nan", "0", "0", "0"]);
    testSingleFieldOperator!MadOperator(col3File, 2, "mad", ["nan", "0", "1", "2"]);

    auto col1misFile = [[""], ["16"], [""], ["32"], ["-4"]];
    testSingleFieldOperator!MadOperator(col1misFile, 0, "mad", ["nan", "nan", "0", "0", "8", "16"],
                                          new MissingFieldPolicy(true, ""));  // Exclude missing
    testSingleFieldOperator!MadOperator(col1misFile, 0, "mad", ["nan", "0", "8", "0", "8", "4"],
                                          new MissingFieldPolicy(false, "0"));  // Replace missing
}

/** Generates the variance of the fields values. This is a numeric operator.
 */
final class VarianceOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("var", fieldIndex, missingPolicy);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new VarianceCalculator(fieldIndex);
    }

    final class VarianceCalculator : SingleFieldCalculator
    {
        private double _count = 0.0;
        private double _mean = 0.0;
        private double _m2 = 0.0;     // Sum of squares of differences from current mean

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override VarianceOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            _count += 1.0;
            double fieldValue = nextField.to!double;
            double delta = fieldValue - _mean;
            _mean += delta / _count;
            _m2 += delta * (fieldValue - _mean);
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return printOptions.formatNumber(
                (_count >= 2.0) ? (_m2 / (_count - 1.0)) : double.nan);
        }
    }
}

unittest // VarianceOperator
{
    auto col1File = [["5"], ["10"], ["15"]];
    auto col2File = [["-5", "-5"], ["-10", "0"], ["-15", "5"]];
    auto col3File = [["1", "2", "100"], ["2", "3", "100"], ["3", "4", "103"]];

    testSingleFieldOperator!VarianceOperator(col1File, 0, "var", ["nan", "nan", "12.5", "25"]);
    testSingleFieldOperator!VarianceOperator(col2File, 0, "var", ["nan", "nan", "12.5", "25"]);
    testSingleFieldOperator!VarianceOperator(col2File, 1, "var", ["nan", "nan", "12.5", "25"]);
    testSingleFieldOperator!VarianceOperator(col3File, 0, "var", ["nan", "nan", "0.5", "1"]);
    testSingleFieldOperator!VarianceOperator(col3File, 1, "var", ["nan", "nan", "0.5", "1"]);
    testSingleFieldOperator!VarianceOperator(col3File, 2, "var", ["nan", "nan", "0", "3"]);

    auto col1misFile = [["5"], ["10"], [""]];
    testSingleFieldOperator!VarianceOperator(col1misFile, 0, "var", ["nan", "nan", "12.5", "12.5"],
                                          new MissingFieldPolicy(true, ""));  // Exclude missing
    testSingleFieldOperator!VarianceOperator(col1misFile, 0, "var", ["nan", "nan", "12.5", "25"],
                                          new MissingFieldPolicy(false, "15"));  // Replace missing
}

/** Generates the standard deviation of the fields values. This is a numeric operator.
 */
final class StDevOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("stdev", fieldIndex, missingPolicy);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new StDevCalculator(fieldIndex);
    }

    final class StDevCalculator : SingleFieldCalculator
    {
        private double _count = 0.0;
        private double _mean = 0.0;
        private double _m2 = 0.0;     // Sum of squares of differences from current mean

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override StDevOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            _count += 1.0;
            double fieldValue = nextField.to!double;
            double delta = fieldValue - _mean;
            _mean += delta / _count;
            _m2 += delta * (fieldValue - _mean);
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            import std.math : sqrt;
            return printOptions.formatNumber(
                (_count >= 2.0) ? (_m2 / (_count - 1.0)).sqrt : double.nan);
        }
    }
}

/* StDevOperator unit tests - These would be improved with a tolerance option.
 */
unittest
{
    auto col1File = [["1"], ["4"], ["7"]];
    auto col2File = [["3", "3"], ["3", "9"], ["7", "15"]];
    auto col3File = [["11", "10", "10"], ["24", "22", "25"], ["37", "34", "40"]];

    testSingleFieldOperator!StDevOperator(col1File, 0, "stdev", ["nan", "nan", "2.12132034356", "3"]);
    testSingleFieldOperator!StDevOperator(col2File, 0, "stdev", ["nan", "nan", "0", "2.30940107676"]);
    testSingleFieldOperator!StDevOperator(col2File, 1, "stdev", ["nan", "nan", "4.24264068712", "6"]);
    testSingleFieldOperator!StDevOperator(col3File, 0, "stdev", ["nan", "nan", "9.19238815543", "13"]);
    testSingleFieldOperator!StDevOperator(col3File, 1, "stdev", ["nan", "nan", "8.48528137424", "12"]);
    testSingleFieldOperator!StDevOperator(col3File, 2, "stdev", ["nan", "nan", "10.6066017178", "15"]);

    auto col1misFile = [["1"], ["4"], [""]];
    testSingleFieldOperator!StDevOperator(col1misFile, 0, "stdev", ["nan", "nan", "2.12132034356", "2.12132034356"],
                                          new MissingFieldPolicy(true, ""));  // Exclude missing
    testSingleFieldOperator!StDevOperator(col1misFile, 0, "stdev", ["nan", "nan", "2.12132034356", "3"],
                                          new MissingFieldPolicy(false, "7"));  // Replace missing
}

/** UniqueCountOperator generates the number of unique values. Unique values are
 * based on exact text match calculation, not a numeric comparison.
 *
 * All the unique field values are stored in memory as part of this calculation.
 */
final class UniqueCountOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("unique_count", fieldIndex, missingPolicy);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new UniqueCountCalculator(fieldIndex);
    }

    final class UniqueCountCalculator : SingleFieldCalculator
    {
        private bool[string] _values;

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override UniqueCountOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            if (nextField !in _values) _values[nextField.to!string] = true;
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return printOptions.formatNumber(_values.length);
        }
    }
}

unittest // UniqueCount
{
    auto col1File = [["a"], ["b"], ["c"], ["c"], ["b"], ["b"], ["a"], ["ab"]];
    auto col2File = [["abc", "pqr"], ["def", "pqr"], ["def", "xyz"]];
    auto col3File = [["1.0", "1", "a"], ["2.0", "a", "1"], ["2", "a", "1.0"]];

    testSingleFieldOperator!UniqueCountOperator(col1File, 0, "unique_count", ["0", "1", "2", "3", "3", "3", "3", "3", "4"]);
    testSingleFieldOperator!UniqueCountOperator(col2File, 0, "unique_count", ["0", "1", "2", "2"]);
    testSingleFieldOperator!UniqueCountOperator(col2File, 1, "unique_count", ["0", "1", "1", "2"]);
    testSingleFieldOperator!UniqueCountOperator(col3File, 0, "unique_count", ["0", "1", "2", "3"]);
    testSingleFieldOperator!UniqueCountOperator(col3File, 1, "unique_count", ["0", "1", "2", "2"]);
    testSingleFieldOperator!UniqueCountOperator(col3File, 2, "unique_count", ["0", "1", "2", "3"]);

    auto col1misFile = [[""], ["a"], [""], ["b"], ["c"], ["c"], ["b"], ["b"], ["a"], ["ab"]];
    testSingleFieldOperator!UniqueCountOperator(col1misFile, 0, "unique_count", ["0", "0", "1", "1", "2", "3", "3", "3", "3", "3", "4"],
                                                new MissingFieldPolicy(true, ""));  // Exclude missing


    testSingleFieldOperator!UniqueCountOperator(col1misFile, 0, "unique_count", ["0", "1", "2", "2", "3", "4", "4", "4", "4", "4", "5"],
                                                new MissingFieldPolicy(false, "XYZ"));  // Replace missing
}

/** MissingCountOperator generates the number of missing values. This overrides
 * the global missingFieldsPolicy.
 */
final class MissingCountOperator : SingleFieldOperator
{
    private MissingFieldPolicy _globalMissingPolicy;

    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        _globalMissingPolicy = missingPolicy;
        super("missing_count", fieldIndex, new MissingFieldPolicy(false, ""));
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new MissingCountCalculator(fieldIndex);
    }

    final class MissingCountCalculator : SingleFieldCalculator
    {
        private size_t _missingCount = 0;

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override MissingCountOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            if (this.outer._globalMissingPolicy.isMissingField(nextField)) _missingCount++;
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return printOptions.formatNumber(_missingCount);
        }
    }
}

unittest // MissingCount
{
    auto col1File = [["a"], ["b"], [""], [" "], [""]];
    auto col2File = [["abc", ""], ["", ""], ["def", ""]];
    auto col3File = [["", "1", "a"], ["2.0", "", "1"], ["2", "", ""]];

    testSingleFieldOperator!MissingCountOperator(col1File, 0, "missing_count", ["0", "0", "0", "1", "1", "2"]);
    testSingleFieldOperator!MissingCountOperator(col2File, 0, "missing_count", ["0", "0", "1", "1"]);
    testSingleFieldOperator!MissingCountOperator(col2File, 1, "missing_count", ["0", "1", "2", "3"]);
    testSingleFieldOperator!MissingCountOperator(col3File, 0, "missing_count", ["0", "1", "1", "1"]);
    testSingleFieldOperator!MissingCountOperator(col3File, 1, "missing_count", ["0", "0", "1", "2"]);
    testSingleFieldOperator!MissingCountOperator(col3File, 2, "missing_count", ["0", "0", "0", "1"]);

    auto excludeMissing = new MissingFieldPolicy(true, "");
    auto replaceMissing = new MissingFieldPolicy(false, "X");

    testSingleFieldOperator!MissingCountOperator(col1File, 0, "missing_count", ["0", "0", "0", "1", "1", "2"], excludeMissing);
    testSingleFieldOperator!MissingCountOperator(col2File, 0, "missing_count", ["0", "0", "1", "1"], excludeMissing);
    testSingleFieldOperator!MissingCountOperator(col2File, 1, "missing_count", ["0", "1", "2", "3"], excludeMissing);
    testSingleFieldOperator!MissingCountOperator(col3File, 0, "missing_count", ["0", "1", "1", "1"], excludeMissing);
    testSingleFieldOperator!MissingCountOperator(col3File, 1, "missing_count", ["0", "0", "1", "2"], excludeMissing);
    testSingleFieldOperator!MissingCountOperator(col3File, 2, "missing_count", ["0", "0", "0", "1"], excludeMissing);

    testSingleFieldOperator!MissingCountOperator(col1File, 0, "missing_count", ["0", "0", "0", "1", "1", "2"], replaceMissing);
    testSingleFieldOperator!MissingCountOperator(col2File, 0, "missing_count", ["0", "0", "1", "1"], replaceMissing);
    testSingleFieldOperator!MissingCountOperator(col2File, 1, "missing_count", ["0", "1", "2", "3"], replaceMissing);
    testSingleFieldOperator!MissingCountOperator(col3File, 0, "missing_count", ["0", "1", "1", "1"], replaceMissing);
    testSingleFieldOperator!MissingCountOperator(col3File, 1, "missing_count", ["0", "0", "1", "2"], replaceMissing);
    testSingleFieldOperator!MissingCountOperator(col3File, 2, "missing_count", ["0", "0", "0", "1"], replaceMissing);
}

/** NotMissingCountOperator generates the number of not-missing values. This overrides
 * the global missingFieldsPolicy.
 */
final class NotMissingCountOperator : SingleFieldOperator
{
    private MissingFieldPolicy _globalMissingPolicy;

    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        _globalMissingPolicy = missingPolicy;
        super("not_missing_count", fieldIndex, new MissingFieldPolicy(false, ""));
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new NotMissingCountCalculator(fieldIndex);
    }

    final class NotMissingCountCalculator : SingleFieldCalculator
    {
        private size_t _notMissingCount = 0;

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override NotMissingCountOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            if (!this.outer._globalMissingPolicy.isMissingField(nextField)) _notMissingCount++;
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return printOptions.formatNumber(_notMissingCount);
        }
    }
}

unittest // NotMissingCount
{
    auto col1File = [["a"], ["b"], [""], [" "], [""]];
    auto col2File = [["abc", ""], ["", ""], ["def", ""]];
    auto col3File = [["", "1", "a"], ["2.0", "", "1"], ["2", "", ""]];

    testSingleFieldOperator!NotMissingCountOperator(col1File, 0, "not_missing_count", ["0", "1", "2", "2", "3", "3"]);
    testSingleFieldOperator!NotMissingCountOperator(col2File, 0, "not_missing_count", ["0", "1", "1", "2"]);
    testSingleFieldOperator!NotMissingCountOperator(col2File, 1, "not_missing_count", ["0", "0", "0", "0"]);
    testSingleFieldOperator!NotMissingCountOperator(col3File, 0, "not_missing_count", ["0", "0", "1", "2"]);
    testSingleFieldOperator!NotMissingCountOperator(col3File, 1, "not_missing_count", ["0", "1", "1", "1"]);
    testSingleFieldOperator!NotMissingCountOperator(col3File, 2, "not_missing_count", ["0", "1", "2", "2"]);

    auto excludeMissing = new MissingFieldPolicy(true, "");
    auto replaceMissing = new MissingFieldPolicy(false, "X");

    testSingleFieldOperator!NotMissingCountOperator(col1File, 0, "not_missing_count", ["0", "1", "2", "2", "3", "3"], excludeMissing);
    testSingleFieldOperator!NotMissingCountOperator(col2File, 0, "not_missing_count", ["0", "1", "1", "2"], excludeMissing);
    testSingleFieldOperator!NotMissingCountOperator(col2File, 1, "not_missing_count", ["0", "0", "0", "0"], excludeMissing);
    testSingleFieldOperator!NotMissingCountOperator(col3File, 0, "not_missing_count", ["0", "0", "1", "2"], excludeMissing);
    testSingleFieldOperator!NotMissingCountOperator(col3File, 1, "not_missing_count", ["0", "1", "1", "1"], excludeMissing);
    testSingleFieldOperator!NotMissingCountOperator(col3File, 2, "not_missing_count", ["0", "1", "2", "2"], excludeMissing);

    testSingleFieldOperator!NotMissingCountOperator(col1File, 0, "not_missing_count", ["0", "1", "2", "2", "3", "3"], replaceMissing);
    testSingleFieldOperator!NotMissingCountOperator(col2File, 0, "not_missing_count", ["0", "1", "1", "2"], replaceMissing);
    testSingleFieldOperator!NotMissingCountOperator(col2File, 1, "not_missing_count", ["0", "0", "0", "0"], replaceMissing);
    testSingleFieldOperator!NotMissingCountOperator(col3File, 0, "not_missing_count", ["0", "0", "1", "2"], replaceMissing);
    testSingleFieldOperator!NotMissingCountOperator(col3File, 1, "not_missing_count", ["0", "1", "1", "1"], replaceMissing);
    testSingleFieldOperator!NotMissingCountOperator(col3File, 2, "not_missing_count", ["0", "1", "2", "2"], replaceMissing);
}

/** ModeOperator outputs the most frequent value seen. In the event of a tie, the
 * first value seen is produced.
 *
 * All the field values are stored in memory as part of this calculation.
 *
 */
final class ModeOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("mode", fieldIndex, missingPolicy);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new ModeCalculator(fieldIndex);
    }

    final class ModeCalculator : SingleFieldCalculator
    {
        private size_t[string] _valueCounts;
        private Appender!(string[]) _uniqueValues;

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override ModeOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            auto countPtr = (nextField in _valueCounts);

            if (countPtr is null)
            {
                string value = nextField.to!string;
                _uniqueValues.put(value);
                _valueCounts[value] = 1;
            }
            else
            {
                (*countPtr)++;
            }
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            string modeValue = "";
            size_t modeCount = 0;

            foreach (value; _uniqueValues.data)
            {
                assert(value in _valueCounts);

                auto count = _valueCounts[value];

                if (count > modeCount)
                {
                    modeValue = value;
                    modeCount = count;
                }
            }

            return modeValue;
        }
    }
}

unittest // ModeOperator
{
    auto col1File = [["a"], ["b"], ["c"], ["c"], ["b"], ["b"], ["a"]];
    auto col2File = [["abc", "pqr"], ["def", "pqr"], ["def", "xyz"]];
    auto col3File = [["1.0", "1", "a"], ["2.0", "a", "1"], ["2", "a", "1.0"]];

    testSingleFieldOperator!ModeOperator(col1File, 0, "mode", ["", "a", "a", "a", "c", "b", "b", "b"]);
    testSingleFieldOperator!ModeOperator(col2File, 0, "mode", ["", "abc", "abc", "def"]);
    testSingleFieldOperator!ModeOperator(col2File, 1, "mode", ["", "pqr", "pqr", "pqr"]);
    testSingleFieldOperator!ModeOperator(col3File, 0, "mode", ["", "1.0", "1.0", "1.0"]);
    testSingleFieldOperator!ModeOperator(col3File, 1, "mode", ["", "1", "1", "a"]);
    testSingleFieldOperator!ModeOperator(col3File, 2, "mode", ["", "a", "a", "a"]);

    auto col1misFile = [[""], ["a"], [""], ["b"], ["c"], ["c"], ["b"], ["b"]];
    testSingleFieldOperator!ModeOperator(col1misFile, 0, "mode", ["", "", "a", "a", "a", "a", "c", "b", "b"],
                                         new MissingFieldPolicy(true, ""));  // Exclude missing


    testSingleFieldOperator!ModeOperator(col1misFile, 0, "mode", ["", "X", "X", "X", "X", "X", "X", "X", "b"],
                                         new MissingFieldPolicy(false, "X"));  // Replace missing
}

/** ModeCountOperator outputs the count of the most frequent value seen.
 *
 * All the field values are stored in memory as part of this calculation.
 *
 */
final class ModeCountOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("mode_count", fieldIndex, missingPolicy);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new ModeCountCalculator(fieldIndex);
    }

    final class ModeCountCalculator : SingleFieldCalculator
    {
        private size_t[string] _valueCounts;

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override ModeCountOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            auto countPtr = (nextField in _valueCounts);

            if (countPtr is null)
            {
                string value = nextField.to!string;
                _valueCounts[value] = 1;
            }
            else
            {
                (*countPtr)++;
            }
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            size_t modeCount = 0;
            foreach (count; _valueCounts.byValue) if (count > modeCount) modeCount = count;
            return printOptions.formatNumber(modeCount);
        }
    }
}

unittest // ModeCountOperator
{
    auto col1File = [["a"], ["b"], ["c"], ["c"], ["b"], ["b"], ["a"]];
    auto col2File = [["abc", ""], ["def", ""], ["def", "xyz"]];
    auto col3File = [["1.0", "1", "a"], ["2.0", "a", "1"], ["2", "a", "1.0"]];

    testSingleFieldOperator!ModeCountOperator(col1File, 0, "mode_count", ["0", "1", "1", "1", "2", "2", "3", "3"]);
    testSingleFieldOperator!ModeCountOperator(col2File, 0, "mode_count", ["0", "1", "1", "2"]);
    testSingleFieldOperator!ModeCountOperator(col2File, 1, "mode_count", ["0", "1", "2", "2"]);
    testSingleFieldOperator!ModeCountOperator(col3File, 0, "mode_count", ["0", "1", "1", "1"]);
    testSingleFieldOperator!ModeCountOperator(col3File, 1, "mode_count", ["0", "1", "1", "2"]);
    testSingleFieldOperator!ModeCountOperator(col3File, 2, "mode_count", ["0", "1", "1", "1"]);

    auto col1misFile = [[""], ["a"], [""], ["b"], ["c"], ["c"], ["b"], ["b"]];
    testSingleFieldOperator!ModeCountOperator(col1misFile, 0, "mode_count", ["0", "0", "1", "1", "1", "1", "2", "2", "3"],
                                              new MissingFieldPolicy(true, ""));  // Exclude missing


    testSingleFieldOperator!ModeCountOperator(col1misFile, 0, "mode_count", ["0", "1", "1", "2", "2", "2", "2", "2", "3"],
                                              new MissingFieldPolicy(false, "X"));  // Replace missing
}

/** ValuesOperator outputs each value delimited by an alternate delimiter character.
 *
 * All the field values are stored in memory as part of this calculation. This is
 * handled by unique key value lists.
 */

final class ValuesOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("values", fieldIndex, missingPolicy);
        setSaveFieldValuesText();
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new ValuesCalculator(fieldIndex);
    }

    final class ValuesCalculator : SingleFieldCalculator
    {
        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override ValuesOperator getOperator()
        {
            return this.outer;
        }

        /* Work is done by saving the field values. */
        final override void processNextField(const char[] nextField)
        { }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return valuesLists.textValues(fieldIndex).join(printOptions.valuesDelimiter);
        }
    }
}

unittest // ValuesOperator
{
    auto col1File = [["a"], [""], ["b"], ["cd"], ["e"], [""], ["a"]];
    auto col2File = [["", "50"], ["", "51"], ["xyz", "52"]];
    auto col3File = [["z", "a", "-"], ["y", "ab", "--"], ["w", "ba", "---"]];

    testSingleFieldOperator!ValuesOperator(col1File, 0, "values", ["", "a", "a|", "a||b", "a||b|cd", "a||b|cd|e", "a||b|cd|e|", "a||b|cd|e||a"]);
    testSingleFieldOperator!ValuesOperator(col2File, 0, "values", ["", "", "|", "||xyz"]);
    testSingleFieldOperator!ValuesOperator(col2File, 1, "values", ["", "50", "50|51", "50|51|52"]);
    testSingleFieldOperator!ValuesOperator(col3File, 0, "values", ["", "z", "z|y", "z|y|w"]);
    testSingleFieldOperator!ValuesOperator(col3File, 1, "values", ["", "a", "a|ab", "a|ab|ba"]);
    testSingleFieldOperator!ValuesOperator(col3File, 2, "values", ["", "-", "-|--", "-|--|---"]);

    testSingleFieldOperator!ValuesOperator(col1File, 0, "values", ["", "a", "a", "a|b", "a|b|cd", "a|b|cd|e", "a|b|cd|e", "a|b|cd|e|a"],
                                         new MissingFieldPolicy(true, ""));  // Exclude missing


    testSingleFieldOperator!ValuesOperator(col1File, 0, "values", ["", "a", "a|X", "a|X|b", "a|X|b|cd", "a|X|b|cd|e", "a|X|b|cd|e|X", "a|X|b|cd|e|X|a"],
                                         new MissingFieldPolicy(false, "X"));  // Replace missing
}

/** UniqueValuesOperator outputs each unique value delimited by an alternate delimiter
 * character. Values are output in the order seen.
 *
 * All unique field values are stored in memory as part of this calculation.
 *
 */
final class UniqueValuesOperator : SingleFieldOperator
{
    this(size_t fieldIndex, MissingFieldPolicy missingPolicy)
    {
        super("unique_values", fieldIndex, missingPolicy);
    }

    final override SingleFieldCalculator makeCalculator()
    {
        return new UniqueValuesCalculator(fieldIndex);
    }

    final class UniqueValuesCalculator : SingleFieldCalculator
    {
        private size_t[string] _valuesHash;
        private Appender!(string[]) _uniqueValues;

        this(size_t fieldIndex)
        {
            super(fieldIndex);
        }

        final override UniqueValuesOperator getOperator()
        {
            return this.outer;
        }

        final override void processNextField(const char[] nextField)
        {
            auto ptr = (nextField in _valuesHash);

            if (ptr is null)
            {
                string value = nextField.to!string;
                _uniqueValues.put(value);
                _valuesHash[value] = 1;
            }
        }

        final string calculate(UniqueKeyValuesLists valuesLists, const ref SummarizerPrintOptions printOptions)
        {
            return _uniqueValues.data.join(printOptions.valuesDelimiter);
        }
    }
}

unittest // UniqueValuesOperator
{
    auto col1File = [["a"], [""], ["b"], ["cd"], ["e"], [""], ["a"]];
    auto col2File = [["", "50"], ["", "50"], ["xyz", "52"]];
    auto col3File = [["z", "a", "-"], ["y", "ab", "--"], ["w", "ba", "-"]];

    testSingleFieldOperator!UniqueValuesOperator(col1File, 0, "unique_values", ["", "a", "a|", "a||b", "a||b|cd", "a||b|cd|e", "a||b|cd|e", "a||b|cd|e"]);
    testSingleFieldOperator!UniqueValuesOperator(col2File, 0, "unique_values", ["", "", "", "|xyz"]);
    testSingleFieldOperator!UniqueValuesOperator(col2File, 1, "unique_values", ["", "50", "50", "50|52"]);
    testSingleFieldOperator!UniqueValuesOperator(col3File, 0, "unique_values", ["", "z", "z|y", "z|y|w"]);
    testSingleFieldOperator!UniqueValuesOperator(col3File, 1, "unique_values", ["", "a", "a|ab", "a|ab|ba"]);
    testSingleFieldOperator!UniqueValuesOperator(col3File, 2, "unique_values", ["", "-", "-|--", "-|--"]);

    testSingleFieldOperator!UniqueValuesOperator(col1File, 0, "unique_values", ["", "a", "a", "a|b", "a|b|cd", "a|b|cd|e", "a|b|cd|e", "a|b|cd|e"],
                                                 new MissingFieldPolicy(true, ""));  // Exclude missing


    testSingleFieldOperator!UniqueValuesOperator(col1File, 0, "unique_values", ["", "a", "a|X", "a|X|b", "a|X|b|cd", "a|X|b|cd|e", "a|X|b|cd|e", "a|X|b|cd|e"],
                                                 new MissingFieldPolicy(false, "X"));  // Replace missing
}
